presence: define modes for publ_cache parameter
[sip-router] / src / modules / presence / notify.c
1 /*
2  * presence module- presence server implementation
3  *
4  * Copyright (C) 2006 Voice Sistem S.R.L.
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  *
22  */
23
24 /*! \file
25  * \brief Kamailio presence module :: Notification with SIP NOTIFY
26  * \ingroup presence
27  */
28
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <libxml/parser.h>
34
35 #include "../../core/trim.h"
36 #include "../../core/ut.h"
37 #include "../../core/globals.h"
38 #include "../../core/str.h"
39 #include "../../lib/srdb1/db.h"
40 #include "../../lib/srdb1/db_val.h"
41 #include "../../core/hashes.h"
42 #include "../../core/socket_info.h"
43 #include "../../modules/tm/tm_load.h"
44 #include "../pua/hash.h"
45 #include "presentity.h"
46 #include "presence.h"
47 #include "notify.h"
48 #include "utils_func.h"
49 #include "../../core/receive.h"
50
51 #define ALLOC_SIZE 3000
52 #define MAX_FORWARD 70
53
54 int goto_on_notify_reply = -1;
55
56 extern int pres_local_log_level;
57 extern int pres_local_log_facility;
58 extern subs_t *_pres_subs_last_sub;
59 extern int _pres_subs_mode;
60
61 c_back_param *shm_dup_cbparam(subs_t *);
62 void free_cbparam(c_back_param *cb_param);
63
64 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps);
65 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
66 int add_watcher_list(subs_t *s, watcher_t *watchers);
67 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
68                 str event, int STATE_FLAG);
69 void free_watcher_list(watcher_t *watchers);
70
71 str str_to_user_col = str_init("to_user");
72 str str_username_col = str_init("username");
73 str str_domain_col = str_init("domain");
74 str str_body_col = str_init("body");
75 str str_to_domain_col = str_init("to_domain");
76 str str_from_user_col = str_init("from_user");
77 str str_from_domain_col = str_init("from_domain");
78 str str_watcher_username_col = str_init("watcher_username");
79 str str_watcher_domain_col = str_init("watcher_domain");
80 str str_event_id_col = str_init("event_id");
81 str str_event_col = str_init("event");
82 str str_etag_col = str_init("etag");
83 str str_ruid_col = str_init("ruid");
84 str str_from_tag_col = str_init("from_tag");
85 str str_to_tag_col = str_init("to_tag");
86 str str_callid_col = str_init("callid");
87 str str_local_cseq_col = str_init("local_cseq");
88 str str_remote_cseq_col = str_init("remote_cseq");
89 str str_record_route_col = str_init("record_route");
90 str str_contact_col = str_init("contact");
91 str str_expires_col = str_init("expires");
92 str str_status_col = str_init("status");
93 str str_reason_col = str_init("reason");
94 str str_socket_info_col = str_init("socket_info");
95 str str_local_contact_col = str_init("local_contact");
96 str str_version_col = str_init("version");
97 str str_presentity_uri_col = str_init("presentity_uri");
98 str str_inserted_time_col = str_init("inserted_time");
99 str str_received_time_col = str_init("received_time");
100 str str_id_col = str_init("id");
101 str str_sender_col = str_init("sender");
102 str str_updated_col = str_init("updated");
103 str str_updated_winfo_col = str_init("updated_winfo");
104 str str_priority_col = str_init("priority");
105 str str_flags_col = str_init("flags");
106 str str_user_agent_col = str_init("user_agent");
107
108 int subset = 0;
109
110 char *get_status_str(int status_flag)
111 {
112         switch(status_flag) {
113                 case ACTIVE_STATUS:
114                         return "active";
115                 case PENDING_STATUS:
116                         return "pending";
117                 case TERMINATED_STATUS:
118                         return "terminated";
119                 case WAITING_STATUS:
120                         return "waiting";
121         }
122         return NULL;
123 }
124
125 void printf_subs(subs_t *subs)
126 {
127         LM_DBG("pres_uri: %.*s\n", subs->pres_uri.len, subs->pres_uri.s);
128         LM_DBG("watcher_user@watcher_domain: %.*s@%.*s\n", subs->watcher_user.len,
129                         subs->watcher_user.s, subs->watcher_domain.len,
130                         subs->watcher_domain.s);
131         LM_DBG("to_user@to_domain: %.*s@%.*s\n", subs->to_user.len, subs->to_user.s,
132                         subs->to_domain.len, subs->to_domain.s);
133         LM_DBG("from_user@from_domain: %.*s@%.*s\n", subs->from_user.len,
134                         subs->from_user.s, subs->from_domain.len, subs->from_domain.s);
135         LM_DBG("callid/from_tag/to_tag: %.*s/%.*s/%.*s\n", subs->callid.len,
136                         subs->callid.s, subs->from_tag.len, subs->from_tag.s,
137                         subs->to_tag.len, subs->to_tag.s);
138         LM_DBG("local_cseq/remote_cseq: %u/%u\n", subs->local_cseq,
139                         subs->remote_cseq);
140         LM_DBG("local_contact/contact: %.*s/%.*s\n", subs->local_contact.len,
141                         subs->local_contact.s, subs->contact.len, subs->contact.s);
142         LM_DBG("record_route: %.*s\n", subs->record_route.len,
143                         subs->record_route.s);
144         LM_DBG("sockinfo_str: %.*s\n", subs->sockinfo_str.len,
145                         subs->sockinfo_str.s);
146
147         LM_DBG("event: %.*s\n", subs->event->name.len, subs->event->name.s);
148         LM_DBG("status: %s\n", get_status_str(subs->status));
149         LM_DBG("reason: %.*s\n", subs->reason.len, subs->reason.s);
150         LM_DBG("version: %u\n", subs->version);
151         LM_DBG("expires: %u\n", subs->expires);
152
153         LM_DBG("updated/updated_winfo: %d/%d\n", subs->updated,
154                         subs->updated_winfo);
155 }
156
157 int build_str_hdr(subs_t *subs, int is_body, str *hdr)
158 {
159         pres_ev_t *event = subs->event;
160         str expires = {0, 0};
161         str status = {0, 0};
162         char *p;
163         str trans = {";transport=", 11};
164
165         if(hdr == NULL) {
166                 LM_ERR("bad parameter\n");
167                 return -1;
168         }
169         expires.s = int2str(subs->expires, &expires.len);
170
171         status.s = get_status_str(subs->status);
172         if(status.s == NULL) {
173                 LM_ERR("bad status %d\n", subs->status);
174                 return -1;
175         }
176         status.len = strlen(status.s);
177
178         hdr->len =
179                         18 /*Max-Forwards:  + val*/ + CRLF_LEN
180                         + 7 /*Event: */ + subs->event->name.len
181                         + 4 /*;id=*/ + subs->event_id.len + CRLF_LEN
182                         + 10 /*Contact: <*/ + subs->local_contact.len
183                         + 1 /*>*/ + 15 /*";transport=xxxx"*/ + CRLF_LEN
184                         + 20 /*Subscription-State: */ + status.len
185                         + 10 /*reason/expires params*/
186                         + (subs->reason.len > expires.len ? subs->reason.len : expires.len)
187                         + CRLF_LEN
188                         + (is_body ? (14 /*Content-Type: */ + subs->event->content_type.len
189                                                                  + CRLF_LEN)
190                                            : 0)
191                         + 1;
192
193         hdr->s = (char *)pkg_malloc(hdr->len);
194         if(hdr->s == NULL) {
195                 LM_ERR("no more pkg memory\n");
196                 return -1;
197         }
198
199         p = hdr->s;
200         p += sprintf(p, "Max-Forwards: %d\r\n", MAX_FORWARD);
201
202         p += sprintf(p, "Event: %.*s", event->name.len, event->name.s);
203         if(subs->event_id.len && subs->event_id.s) {
204                 p += sprintf(p, ";id=%.*s", subs->event_id.len, subs->event_id.s);
205         }
206         memcpy(p, CRLF, CRLF_LEN);
207         p += CRLF_LEN;
208
209         p += sprintf(p, "Contact: <%.*s", subs->local_contact.len,
210                         subs->local_contact.s);
211         if(subs->sockinfo_str.s != NULL
212                         && str_search(&subs->local_contact, &trans) == 0) {
213                 /* fix me */
214                 switch(subs->sockinfo_str.s[0]) {
215                         case 's':
216                         case 'S':
217                                 memcpy(p, ";transport=sctp", 15);
218                                 p += 15;
219                                 break;
220                         case 't':
221                         case 'T':
222                                 switch(subs->sockinfo_str.s[1]) {
223                                         case 'c':
224                                         case 'C':
225                                                 memcpy(p, ";transport=tcp", 14);
226                                                 p += 14;
227                                                 break;
228                                         case 'l':
229                                         case 'L':
230                                                 memcpy(p, ";transport=tls", 14);
231                                                 p += 14;
232                                                 break;
233                                 }
234                                 break;
235                 }
236         }
237         *p = '>';
238         p++;
239         memcpy(p, CRLF, CRLF_LEN);
240         p += CRLF_LEN;
241
242         p += sprintf(p, "Subscription-State: %.*s", status.len, status.s);
243
244         if(subs->status == TERMINATED_STATUS) {
245                 LM_DBG("state = terminated\n");
246                 p += sprintf(p, ";reason=%.*s", subs->reason.len, subs->reason.s);
247         } else {
248                 p += sprintf(p, ";expires=%.*s", expires.len, expires.s);
249         }
250         memcpy(p, CRLF, CRLF_LEN);
251         p += CRLF_LEN;
252
253         if(is_body) {
254                 p += sprintf(p, "Content-Type: %.*s\r\n", event->content_type.len,
255                                 event->content_type.s);
256         }
257
258         *p = '\0';
259         hdr->len = p - hdr->s;
260
261         return 0;
262 }
263
264 int get_wi_subs_db(subs_t *subs, watcher_t *watchers)
265 {
266         subs_t sb;
267         db_key_t query_cols[3];
268         db_op_t query_ops[3];
269         db_val_t query_vals[3];
270         db_key_t result_cols[5];
271         db1_res_t *result = NULL;
272         db_row_t *row = NULL;
273         db_val_t *row_vals = NULL;
274         int n_result_cols = 0;
275         int n_query_cols = 0;
276         int i;
277         int status_col, watcher_user_col, watcher_domain_col, callid_col;
278
279         query_cols[n_query_cols] = &str_presentity_uri_col;
280         query_ops[n_query_cols] = OP_EQ;
281         query_vals[n_query_cols].type = DB1_STR;
282         query_vals[n_query_cols].nul = 0;
283         query_vals[n_query_cols].val.str_val = subs->pres_uri;
284         n_query_cols++;
285
286         query_cols[n_query_cols] = &str_event_col;
287         query_ops[n_query_cols] = OP_EQ;
288         query_vals[n_query_cols].type = DB1_STR;
289         query_vals[n_query_cols].nul = 0;
290         query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
291         n_query_cols++;
292
293         query_cols[n_query_cols] = &str_expires_col;
294         query_ops[n_query_cols] = OP_GT;
295         query_vals[n_query_cols].type = DB1_INT;
296         query_vals[n_query_cols].nul = 0;
297         query_vals[n_query_cols].val.int_val = (int)time(NULL) + pres_expires_offset;
298         n_query_cols++;
299
300         result_cols[status_col = n_result_cols++] = &str_status_col;
301         result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
302         result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
303         result_cols[callid_col = n_result_cols++] = &str_callid_col;
304
305         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
306                 LM_ERR("in use_table\n");
307                 goto error;
308         }
309
310         if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
311                            n_query_cols, n_result_cols, 0, &result)
312                         < 0) {
313                 LM_ERR("querying active_watchers db table\n");
314                 goto error;
315         }
316
317         if(result == NULL) {
318                 goto error;
319         }
320
321         if(result->n <= 0) {
322                 LM_DBG("The query in db table for active subscription"
323                            " returned no result\n");
324                 pa_dbf.free_result(pa_db, result);
325                 return 0;
326         }
327
328         for(i = 0; i < result->n; i++) {
329                 row = &result->rows[i];
330                 row_vals = ROW_VALUES(row);
331
332                 sb.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
333                 sb.watcher_user.len = strlen(sb.watcher_user.s);
334
335                 sb.watcher_domain.s =
336                                 (char *)row_vals[watcher_domain_col].val.string_val;
337                 sb.watcher_domain.len = strlen(sb.watcher_domain.s);
338
339                 sb.callid.s = (char *)row_vals[callid_col].val.string_val;
340                 sb.callid.len = strlen(sb.callid.s);
341
342                 sb.event = subs->event->wipeer;
343                 sb.status = row_vals[status_col].val.int_val;
344
345                 if(add_watcher_list(&sb, watchers) < 0)
346                         goto error;
347         }
348
349         pa_dbf.free_result(pa_db, result);
350         return 0;
351
352 error:
353         if(result)
354                 pa_dbf.free_result(pa_db, result);
355         return -1;
356 }
357
358 str *get_wi_notify_body(subs_t *subs, subs_t *watcher_subs)
359 {
360         str *notify_body = NULL;
361         char *version_str;
362         watcher_t *watchers = NULL;
363         int len = 0;
364         unsigned int hash_code;
365         subs_t *s = NULL;
366         int state = FULL_STATE_FLAG;
367         unsigned int now = (int)time(NULL);
368
369         hash_code = 0;
370         version_str = int2str(subs->version, &len);
371         if(version_str == NULL) {
372                 LM_ERR("converting int to str\n ");
373                 goto error;
374         }
375
376         watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
377         if(watchers == NULL) {
378                 ERR_MEM(PKG_MEM_STR);
379         }
380         memset(watchers, 0, sizeof(watcher_t));
381
382         if(watcher_subs != NULL) {
383                 if(add_watcher_list(watcher_subs, watchers) < 0)
384                         goto error;
385                 state = PARTIAL_STATE_FLAG;
386
387                 goto done;
388         }
389
390         if(pres_subs_dbmode == DB_ONLY) {
391                 if(get_wi_subs_db(subs, watchers) < 0) {
392                         LM_ERR("getting watchers from database\n");
393                         goto error;
394                 }
395         } else {
396                 hash_code = core_case_hash(
397                                 &subs->pres_uri, &subs->event->wipeer->name, shtable_size);
398                 lock_get(&subs_htable[hash_code].lock);
399                 s = subs_htable[hash_code].entries;
400                 while(s->next) {
401                         s = s->next;
402
403                         if(s->expires < now) {
404                                 LM_DBG("expired record\n");
405                                 continue;
406                         }
407
408                         if(s->event == subs->event->wipeer
409                                         && s->pres_uri.len == subs->pres_uri.len
410                                         && presence_sip_uri_match(&s->pres_uri, &subs->pres_uri)
411                                                            == 0) {
412                                 if(add_watcher_list(s, watchers) < 0) {
413                                         lock_release(&subs_htable[hash_code].lock);
414                                         goto error;
415                                 }
416                         }
417                 }
418                 lock_release(&subs_htable[hash_code].lock);
419
420                 if(add_waiting_watchers(
421                                    watchers, subs->pres_uri, subs->event->wipeer->name)
422                                 < 0) {
423                         LM_ERR("failed to add waiting watchers\n");
424                         goto error;
425                 }
426         }
427
428 done:
429         notify_body = create_winfo_xml(watchers, version_str, subs->pres_uri,
430                         subs->event->wipeer->name, state);
431         if(notify_body == NULL) {
432                 LM_ERR("in function create_winfo_xml\n");
433                 goto error;
434         }
435         free_watcher_list(watchers);
436         return notify_body;
437
438 error:
439         free_watcher_list(watchers);
440         return NULL;
441 }
442
443 void free_watcher_list(watcher_t *watchers)
444 {
445         watcher_t *w;
446         while(watchers) {
447                 w = watchers;
448                 if(w->uri.s != NULL)
449                         pkg_free(w->uri.s);
450                 if(w->id.s != NULL)
451                         pkg_free(w->id.s);
452                 watchers = watchers->next;
453                 pkg_free(w);
454         }
455
456         watchers = NULL;
457 }
458
459 int add_watcher_list(subs_t *s, watcher_t *watchers)
460 {
461         watcher_t *w;
462
463         w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
464         if(w == NULL) {
465                 LM_ERR("No more private memory\n");
466                 return -1;
467         }
468         w->status = s->status;
469         if(uandd_to_uri(s->watcher_user, s->watcher_domain, &w->uri) < 0) {
470                 LM_ERR("failed to create uri\n");
471                 goto error;
472         }
473         w->id.s = (char *)pkg_malloc(s->callid.len + 1);
474         if(w->id.s == NULL) {
475                 LM_ERR("no more memory\n");
476                 goto error;
477         }
478         memcpy(w->id.s, s->callid.s, s->callid.len);
479         w->id.len = s->callid.len;
480         w->id.s[w->id.len] = '\0';
481
482         w->next = watchers->next;
483         watchers->next = w;
484
485         return 0;
486
487 error:
488         if(w) {
489                 if(w->uri.s)
490                         pkg_free(w->uri.s);
491                 pkg_free(w);
492         }
493         return -1;
494 }
495
496 str *build_empty_bla_body(str pres_uri)
497 {
498         xmlDocPtr doc;
499         xmlNodePtr node;
500         xmlAttrPtr attr;
501         str *body = NULL;
502         char *text;
503         int len;
504         char *entity = NULL;
505
506         doc = xmlNewDoc(BAD_CAST "1.0");
507         if(doc == NULL) {
508                 LM_ERR("failed to construct xml document\n");
509                 return NULL;
510         }
511
512         node = xmlNewNode(NULL, BAD_CAST "dialog-info");
513         if(node == NULL) {
514                 LM_ERR("failed to initialize node\n");
515                 goto error;
516         }
517         xmlDocSetRootElement(doc, node);
518
519         attr = xmlNewProp(node, BAD_CAST "xmlns",
520                         BAD_CAST "urn:ietf:params:xml:ns:dialog-info");
521         if(attr == NULL) {
522                 LM_ERR("failed to initialize node attribute\n");
523                 goto error;
524         }
525         attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
526         if(attr == NULL) {
527                 LM_ERR("failed to initialize node attribute\n");
528                 goto error;
529         }
530
531         attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
532         if(attr == NULL) {
533                 LM_ERR("failed to initialize node attribute\n");
534                 goto error;
535         }
536
537         entity = (char *)pkg_malloc(pres_uri.len + 1);
538         if(entity == NULL) {
539                 LM_ERR("no more memory\n");
540                 goto error;
541         }
542         memcpy(entity, pres_uri.s, pres_uri.len);
543         entity[pres_uri.len] = '\0';
544
545         attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
546         if(attr == NULL) {
547                 LM_ERR("failed to initialize node attribute\n");
548                 pkg_free(entity);
549                 goto error;
550         }
551
552         body = (str *)pkg_malloc(sizeof(str));
553         if(body == NULL) {
554                 LM_ERR("no more private memory");
555                 pkg_free(entity);
556                 goto error;
557         }
558
559         xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&text, &len, 1);
560         body->s = (char *)pkg_malloc(len);
561         if(body->s == NULL) {
562                 LM_ERR("no more private memory");
563                 pkg_free(body);
564                 pkg_free(entity);
565                 goto error;
566         }
567         memcpy(body->s, text, len);
568         body->len = len;
569
570
571         pkg_free(entity);
572         xmlFreeDoc(doc);
573         xmlFree(text);
574
575         return body;
576
577 error:
578         xmlFreeDoc(doc);
579         return NULL;
580 }
581
582 str *get_p_notify_body(str pres_uri, pres_ev_t *event, str *etag, str *contact)
583 {
584         db_key_t query_cols[4];
585         db_val_t query_vals[4];
586         db_op_t query_ops[4];
587         db_key_t result_cols[3];
588         db1_res_t *result = NULL;
589         int body_col, etag_col = 0, sender_col;
590         str **body_array = NULL;
591         str *notify_body = NULL;
592         db_row_t *row = NULL;
593         db_val_t *row_vals;
594         int n_result_cols = 0;
595         int n_query_cols = 0;
596         int i, n = 0, len;
597         int build_off_n = -1;
598         str etags;
599         str *body;
600         int size = 0;
601         struct sip_uri uri;
602         unsigned int hash_code;
603         str sender;
604         static str query_str;
605
606         if(parse_uri(pres_uri.s, pres_uri.len, &uri) < 0) {
607                 LM_ERR("while parsing uri\n");
608                 return NULL;
609         }
610
611         /* if in db_only mode, get the presentity information from database - skip htable search */
612         if(publ_cache_mode == PS_PCACHE_HYBRID) {
613                 /* search in hash table if any record exists */
614                 hash_code = core_case_hash(&pres_uri, NULL, phtable_size);
615                 if(search_phtable(&pres_uri, event->evp->type, hash_code) == NULL) {
616                         LM_DBG("No record exists in hash_table\n");
617
618                         /* for pidf manipulation */
619                         if(event->agg_nbody) {
620                                 notify_body =
621                                                 event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
622                                 if(notify_body)
623                                         goto done;
624                         }
625                         return NULL;
626                 }
627         }
628
629         query_cols[n_query_cols] = &str_domain_col;
630         query_vals[n_query_cols].type = DB1_STR;
631         query_vals[n_query_cols].nul = 0;
632         query_vals[n_query_cols].val.str_val = uri.host;
633         query_ops[n_query_cols] = OP_EQ;
634         n_query_cols++;
635
636         query_cols[n_query_cols] = &str_username_col;
637         query_vals[n_query_cols].type = DB1_STR;
638         query_vals[n_query_cols].nul = 0;
639         query_vals[n_query_cols].val.str_val = uri.user;
640         query_ops[n_query_cols] = OP_EQ;
641         n_query_cols++;
642
643         query_cols[n_query_cols] = &str_event_col;
644         query_vals[n_query_cols].type = DB1_STR;
645         query_vals[n_query_cols].nul = 0;
646         query_vals[n_query_cols].val.str_val = event->name;
647         query_ops[n_query_cols] = OP_EQ;
648         n_query_cols++;
649
650         if(pres_startup_mode == 1) {
651                 query_cols[n_query_cols] = &str_expires_col;
652                 query_vals[n_query_cols].type = DB1_INT;
653                 query_vals[n_query_cols].nul = 0;
654                 query_vals[n_query_cols].val.int_val = (int)time(NULL);
655                 query_ops[n_query_cols] = OP_GT;
656                 n_query_cols++;
657         }
658
659         result_cols[body_col = n_result_cols++] = &str_body_col;
660         result_cols[etag_col = n_result_cols++] = &str_etag_col;
661         result_cols[sender_col = n_result_cols++] = &str_sender_col;
662
663         if(pa_dbf.use_table(pa_db, &presentity_table) < 0) {
664                 LM_ERR("in use_table\n");
665                 return NULL;
666         }
667
668         if(pres_retrieve_order == 1) {
669                 query_str = pres_retrieve_order_by;
670         } else {
671                 query_str = str_received_time_col;
672         }
673         if(pres_startup_mode == 1) {
674                 if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
675                                    n_query_cols, n_result_cols, &query_str, &result)
676                                 < 0) {
677                         LM_ERR("failed to query %.*s table\n", presentity_table.len,
678                                         presentity_table.s);
679                         if(result)
680                                 pa_dbf.free_result(pa_db, result);
681                         return NULL;
682                 }
683         } else {
684                 if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
685                                    n_query_cols, n_result_cols, &query_str, &result)
686                                 < 0) {
687                         LM_ERR("failed to query %.*s table\n", presentity_table.len,
688                                         presentity_table.s);
689                         if(result)
690                                 pa_dbf.free_result(pa_db, result);
691                         return NULL;
692                 }
693         }
694
695         if(result == NULL)
696                 return NULL;
697
698         if(result->n <= 0) {
699                 LM_DBG("The query returned no result\n[username]= %.*s"
700                            "\t[domain]= %.*s\t[event]= %.*s\n",
701                                 uri.user.len, uri.user.s, uri.host.len, uri.host.s,
702                                 event->name.len, event->name.s);
703
704                 pa_dbf.free_result(pa_db, result);
705                 result = NULL;
706
707                 if(event->agg_nbody) {
708                         notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
709                         if(notify_body)
710                                 goto done;
711                 }
712                 return NULL;
713         } else {
714                 n = result->n;
715                 if(event->agg_nbody == NULL) {
716                         LM_DBG("Event does not require aggregation\n");
717                         row = &result->rows[n - 1];
718                         row_vals = ROW_VALUES(row);
719
720                         /* if event BLA - check if sender is the same as contact */
721                         /* if so, send an empty dialog info document */
722                         if(EVENT_DIALOG_SLA(event->evp) && contact) {
723                                 sender.s = (char *)row_vals[sender_col].val.string_val;
724                                 if(sender.s == NULL || strlen(sender.s) == 0)
725                                         goto after_sender_check;
726                                 sender.len = strlen(sender.s);
727
728                                 if(sender.len == contact->len
729                                                 && presence_sip_uri_match(&sender, contact) == 0) {
730                                         notify_body = build_empty_bla_body(pres_uri);
731                                         pa_dbf.free_result(pa_db, result);
732                                         return notify_body;
733                                 }
734                         }
735
736                 after_sender_check:
737                         if(row_vals[body_col].val.string_val == NULL) {
738                                 LM_ERR("NULL notify body record\n");
739                                 goto error;
740                         }
741                         len = strlen(row_vals[body_col].val.string_val);
742                         if(len == 0) {
743                                 LM_ERR("Empty notify body record\n");
744                                 goto error;
745                         }
746                         notify_body = (str *)pkg_malloc(sizeof(str));
747                         if(notify_body == NULL) {
748                                 ERR_MEM(PKG_MEM_STR);
749                         }
750                         memset(notify_body, 0, sizeof(str));
751                         notify_body->s = (char *)pkg_malloc(len * sizeof(char));
752                         if(notify_body->s == NULL) {
753                                 pkg_free(notify_body);
754                                 ERR_MEM(PKG_MEM_STR);
755                         }
756                         memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
757                         notify_body->len = len;
758                         pa_dbf.free_result(pa_db, result);
759
760                         return notify_body;
761                 }
762
763                 LM_DBG("Event requires aggregation\n");
764
765                 body_array = (str **)pkg_malloc((n + 2) * sizeof(str *));
766                 if(body_array == NULL) {
767                         ERR_MEM(PKG_MEM_STR);
768                 }
769                 memset(body_array, 0, (n + 2) * sizeof(str *));
770
771                 if(etag != NULL) {
772                         LM_DBG("searched etag = %.*s len= %d\n", etag->len, etag->s,
773                                         etag->len);
774                         LM_DBG("etag not NULL\n");
775                         for(i = 0; i < n; i++) {
776                                 row = &result->rows[i];
777                                 row_vals = ROW_VALUES(row);
778                                 etags.s = (char *)row_vals[etag_col].val.string_val;
779                                 etags.len = strlen(etags.s);
780
781                                 LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
782                                 if((etags.len == etag->len)
783                                                 && (strncmp(etags.s, etag->s, etags.len) == 0)) {
784                                         LM_DBG("found etag\n");
785                                         build_off_n = i;
786                                 }
787                                 len = strlen((char *)row_vals[body_col].val.string_val);
788                                 if(len == 0) {
789                                         LM_ERR("Empty notify body record\n");
790                                         goto error;
791                                 }
792
793                                 size = sizeof(str) + len * sizeof(char);
794                                 body = (str *)pkg_malloc(size);
795                                 if(body == NULL) {
796                                         ERR_MEM(PKG_MEM_STR);
797                                 }
798                                 memset(body, 0, size);
799                                 size = sizeof(str);
800                                 body->s = (char *)body + size;
801                                 memcpy(body->s, (char *)row_vals[body_col].val.string_val, len);
802                                 body->len = len;
803
804                                 body_array[i] = body;
805                         }
806                 } else {
807                         for(i = 0; i < n; i++) {
808                                 row = &result->rows[i];
809                                 row_vals = ROW_VALUES(row);
810
811                                 len = strlen((char *)row_vals[body_col].val.string_val);
812                                 if(len == 0) {
813                                         LM_ERR("Empty notify body record\n");
814                                         goto error;
815                                 }
816
817                                 size = sizeof(str) + len * sizeof(char);
818                                 body = (str *)pkg_malloc(size);
819                                 if(body == NULL) {
820                                         ERR_MEM(PKG_MEM_STR);
821                                 }
822                                 memset(body, 0, size);
823                                 size = sizeof(str);
824                                 body->s = (char *)body + size;
825                                 memcpy(body->s, row_vals[body_col].val.string_val, len);
826                                 body->len = len;
827
828                                 body_array[i] = body;
829                         }
830                 }
831                 pa_dbf.free_result(pa_db, result);
832                 result = NULL;
833
834                 notify_body = event->agg_nbody(
835                                 &uri.user, &uri.host, body_array, n, build_off_n);
836         }
837
838 done:
839         if(body_array != NULL) {
840                 for(i = 0; i < n; i++) {
841                         if(body_array[i])
842                                 pkg_free(body_array[i]);
843                 }
844                 pkg_free(body_array);
845         }
846         return notify_body;
847
848 error:
849         if(result != NULL)
850                 pa_dbf.free_result(pa_db, result);
851
852         if(body_array != NULL) {
853                 for(i = 0; i < n; i++) {
854                         if(body_array[i])
855                                 pkg_free(body_array[i]);
856                         else
857                                 break;
858                 }
859
860                 pkg_free(body_array);
861         }
862         return NULL;
863 }
864
865 void free_notify_body(str *body, pres_ev_t *ev)
866 {
867         if(body != NULL) {
868                 if(body->s != NULL) {
869                         if(ev->type & WINFO_TYPE)
870                                 xmlFree(body->s);
871                         else if(ev->agg_nbody == NULL && ev->apply_auth_nbody == NULL)
872                                 pkg_free(body->s);
873                         else
874                                 ev->free_body(body->s);
875                 }
876                 pkg_free(body);
877         }
878 }
879
880 static int ps_free_tm_dlg(dlg_t *td)
881 {
882         if(td) {
883                 if(td->loc_uri.s)
884                         pkg_free(td->loc_uri.s);
885                 if(td->rem_uri.s)
886                         pkg_free(td->rem_uri.s);
887
888                 if(td->route_set)
889                         free_rr(&td->route_set);
890                 pkg_free(td);
891         }
892         return 0;
893 }
894
895 dlg_t *ps_build_dlg_t(subs_t *subs)
896 {
897         dlg_t *td = NULL;
898         int found_contact = 1;
899
900         td = (dlg_t *)pkg_malloc(sizeof(dlg_t));
901         if(td == NULL) {
902                 ERR_MEM(PKG_MEM_STR);
903         }
904         memset(td, 0, sizeof(dlg_t));
905
906         td->loc_seq.value = subs->local_cseq;
907         td->loc_seq.is_set = 1;
908
909         td->id.call_id = subs->callid;
910         td->id.rem_tag = subs->from_tag;
911         td->id.loc_tag = subs->to_tag;
912
913         uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
914         if(td->loc_uri.s == NULL) {
915                 LM_ERR("while creating uri\n");
916                 goto error;
917         }
918
919         if(subs->contact.len == 0 || subs->contact.s == NULL) {
920                 found_contact = 0;
921         } else {
922                 LM_DBG("CONTACT = %.*s\n", subs->contact.len, subs->contact.s);
923                 td->rem_target = subs->contact;
924         }
925
926         uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
927         if(td->rem_uri.s == NULL) {
928                 LM_ERR("while creating uri\n");
929                 goto error;
930         }
931
932         if(found_contact == 0) {
933                 td->rem_target = td->rem_uri;
934         }
935         if(subs->record_route.s && subs->record_route.len) {
936                 if(parse_rr_body(
937                                    subs->record_route.s, subs->record_route.len, &td->route_set)
938                                 < 0) {
939                         LM_ERR("in function parse_rr_body\n");
940                         goto error;
941                 }
942         }
943         td->state = DLG_CONFIRMED;
944
945         if(subs->sockinfo_str.len) {
946                 int port, proto;
947                 str host;
948                 char *tmp;
949                 if((tmp = as_asciiz(&subs->sockinfo_str)) == NULL) {
950                         LM_ERR("no pkg memory left\n");
951                         goto error;
952                 }
953                 if(parse_phostport(tmp, &host.s, &host.len, &port, &proto)) {
954                         LM_ERR("bad sockinfo string\n");
955                         pkg_free(tmp);
956                         goto error;
957                 }
958                 td->send_sock = grep_sock_info(
959                                 &host, (unsigned short)port, (unsigned short)proto);
960                 pkg_free(tmp);
961         }
962
963         return td;
964
965 error:
966         ps_free_tm_dlg(td);
967         return NULL;
968 }
969
970 int get_subs_db(
971                 str *pres_uri, pres_ev_t *event, str *sender, subs_t **s_array, int *n)
972 {
973         db_key_t query_cols[7];
974         db_op_t query_ops[7];
975         db_val_t query_vals[7];
976         db_key_t result_cols[21];
977         int n_result_cols = 0, n_query_cols = 0;
978         db_row_t *row;
979         db_val_t *row_vals;
980         db1_res_t *result = NULL;
981         int from_user_col, from_domain_col, from_tag_col;
982         int to_user_col, to_domain_col, to_tag_col;
983         int expires_col = 0, callid_col, cseq_col, i, reason_col;
984         int version_col = 0, record_route_col = 0, contact_col = 0;
985         int sockinfo_col = 0, local_contact_col = 0, event_id_col = 0;
986         int watcher_user_col = 0, watcher_domain_col = 0;
987         int flags_col = 0, user_agent_col = 0;
988         subs_t s, *s_new;
989         int inc = 0;
990
991         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
992                 LM_ERR("in use_table\n");
993                 return -1;
994         }
995
996         LM_DBG("querying database table = active_watchers\n");
997         query_cols[n_query_cols] = &str_presentity_uri_col;
998         query_ops[n_query_cols] = OP_EQ;
999         query_vals[n_query_cols].type = DB1_STR;
1000         query_vals[n_query_cols].nul = 0;
1001         query_vals[n_query_cols].val.str_val = *pres_uri;
1002         n_query_cols++;
1003
1004         query_cols[n_query_cols] = &str_event_col;
1005         query_ops[n_query_cols] = OP_EQ;
1006         query_vals[n_query_cols].type = DB1_STR;
1007         query_vals[n_query_cols].nul = 0;
1008         query_vals[n_query_cols].val.str_val = event->name;
1009         n_query_cols++;
1010
1011         query_cols[n_query_cols] = &str_status_col;
1012         query_ops[n_query_cols] = OP_EQ;
1013         query_vals[n_query_cols].type = DB1_INT;
1014         query_vals[n_query_cols].nul = 0;
1015         query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
1016         n_query_cols++;
1017
1018         query_cols[n_query_cols] = &str_contact_col;
1019         query_ops[n_query_cols] = OP_NEQ;
1020         query_vals[n_query_cols].type = DB1_STR;
1021         query_vals[n_query_cols].nul = 0;
1022         if(sender) {
1023                 LM_DBG("Do not send Notify to:[uri]= %.*s\n", sender->len, sender->s);
1024                 query_vals[n_query_cols].val.str_val = *sender;
1025         } else {
1026                 query_vals[n_query_cols].val.str_val.s = "";
1027                 query_vals[n_query_cols].val.str_val.len = 0;
1028         }
1029         n_query_cols++;
1030
1031         result_cols[to_user_col = n_result_cols++] = &str_to_user_col;
1032         result_cols[to_domain_col = n_result_cols++] = &str_to_domain_col;
1033         result_cols[from_user_col = n_result_cols++] = &str_from_user_col;
1034         result_cols[from_domain_col = n_result_cols++] = &str_from_domain_col;
1035         result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
1036         result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
1037         result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
1038         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
1039         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
1040         result_cols[callid_col = n_result_cols++] = &str_callid_col;
1041         result_cols[cseq_col = n_result_cols++] = &str_local_cseq_col;
1042         result_cols[record_route_col = n_result_cols++] = &str_record_route_col;
1043         result_cols[contact_col = n_result_cols++] = &str_contact_col;
1044         result_cols[expires_col = n_result_cols++] = &str_expires_col;
1045         result_cols[reason_col = n_result_cols++] = &str_reason_col;
1046         result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
1047         result_cols[local_contact_col = n_result_cols++] = &str_local_contact_col;
1048         result_cols[version_col = n_result_cols++] = &str_version_col;
1049         result_cols[flags_col = n_result_cols++] = &str_flags_col;
1050         result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
1051
1052         if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
1053                            n_query_cols, n_result_cols, 0, &result)
1054                         < 0) {
1055                 LM_ERR("while querying database\n");
1056                 if(result) {
1057                         pa_dbf.free_result(pa_db, result);
1058                 }
1059                 return -1;
1060         }
1061
1062         if(result == NULL)
1063                 return -1;
1064
1065         if(result->n <= 0) {
1066                 LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
1067                            " returned no result\n",
1068                                 pres_uri->len, pres_uri->s, event->name.len, event->name.s);
1069                 pa_dbf.free_result(pa_db, result);
1070                 return 0;
1071         }
1072         LM_DBG("found %d dialogs\n", result->n);
1073
1074         for(i = 0; i < result->n; i++) {
1075                 row = &result->rows[i];
1076                 row_vals = ROW_VALUES(row);
1077
1078                 if(row_vals[reason_col].val.string_val) {
1079                         if(strlen(row_vals[reason_col].val.string_val) != 0)
1080                                 continue;
1081                 }
1082
1083                 //      s.reason.len= strlen(s.reason.s);
1084
1085                 memset(&s, 0, sizeof(subs_t));
1086                 s.status = ACTIVE_STATUS;
1087
1088                 s.pres_uri = *pres_uri;
1089                 s.to_user.s = (char *)row_vals[to_user_col].val.string_val;
1090                 s.to_user.len = strlen(s.to_user.s);
1091
1092                 s.to_domain.s = (char *)row_vals[to_domain_col].val.string_val;
1093                 s.to_domain.len = strlen(s.to_domain.s);
1094
1095                 s.from_user.s = (char *)row_vals[from_user_col].val.string_val;
1096                 s.from_user.len = strlen(s.from_user.s);
1097
1098                 s.from_domain.s = (char *)row_vals[from_domain_col].val.string_val;
1099                 s.from_domain.len = strlen(s.from_domain.s);
1100
1101                 s.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
1102                 s.watcher_user.len = strlen(s.watcher_user.s);
1103
1104                 s.watcher_domain.s =
1105                                 (char *)row_vals[watcher_domain_col].val.string_val;
1106                 s.watcher_domain.len = strlen(s.watcher_domain.s);
1107
1108                 s.event_id.s = (char *)row_vals[event_id_col].val.string_val;
1109                 s.event_id.len = (s.event_id.s) ? strlen(s.event_id.s) : 0;
1110
1111                 s.to_tag.s = (char *)row_vals[to_tag_col].val.string_val;
1112                 s.to_tag.len = strlen(s.to_tag.s);
1113
1114                 s.from_tag.s = (char *)row_vals[from_tag_col].val.string_val;
1115                 s.from_tag.len = strlen(s.from_tag.s);
1116
1117                 s.callid.s = (char *)row_vals[callid_col].val.string_val;
1118                 s.callid.len = strlen(s.callid.s);
1119
1120                 s.record_route.s = (char *)row_vals[record_route_col].val.string_val;
1121                 s.record_route.len = (s.record_route.s) ? strlen(s.record_route.s) : 0;
1122
1123                 s.contact.s = (char *)row_vals[contact_col].val.string_val;
1124                 s.contact.len = strlen(s.contact.s);
1125
1126                 s.sockinfo_str.s = (char *)row_vals[sockinfo_col].val.string_val;
1127                 s.sockinfo_str.len = s.sockinfo_str.s ? strlen(s.sockinfo_str.s) : 0;
1128
1129                 s.local_contact.s = (char *)row_vals[local_contact_col].val.string_val;
1130                 s.local_contact.len = s.local_contact.s ? strlen(s.local_contact.s) : 0;
1131
1132                 s.event = event;
1133                 s.local_cseq = row_vals[cseq_col].val.int_val + 1;
1134                 if(row_vals[expires_col].val.int_val < (int)time(NULL) + pres_expires_offset)
1135                         s.expires = 0;
1136                 else
1137                         s.expires = row_vals[expires_col].val.int_val - (int)time(NULL);
1138                 s.version = row_vals[version_col].val.int_val + 1;
1139                 s.flags = row_vals[flags_col].val.int_val;
1140                 s.user_agent.s = (char *)row_vals[user_agent_col].val.string_val;
1141                 s.user_agent.len = (s.user_agent.s) ? strlen(s.user_agent.s) : 0;
1142
1143                 s_new = mem_copy_subs(&s, PKG_MEM_TYPE);
1144                 if(s_new == NULL) {
1145                         LM_ERR("while copying subs_t structure\n");
1146                         goto error;
1147                 }
1148                 s_new->next = (*s_array);
1149                 (*s_array) = s_new;
1150                 printf_subs(s_new);
1151                 inc++;
1152         }
1153         pa_dbf.free_result(pa_db, result);
1154         *n = inc;
1155
1156         return 0;
1157
1158 error:
1159         if(result)
1160                 pa_dbf.free_result(pa_db, result);
1161
1162         return -1;
1163 }
1164
1165 subs_t *get_subs_dialog(str *pres_uri, pres_ev_t *event, str *sender)
1166 {
1167         unsigned int hash_code;
1168         subs_t *s = NULL, *s_new;
1169         subs_t *s_array = NULL;
1170         int n = 0;
1171
1172         /* if pres_subs_dbmode!=DB_ONLY, should take the subscriptions from the
1173                 hashtable only in DB_ONLY mode should take all dialogs from db */
1174
1175         if(pres_subs_dbmode == DB_ONLY) {
1176                 if(get_subs_db(pres_uri, event, sender, &s_array, &n) < 0) {
1177                         LM_ERR("getting dialogs from database\n");
1178                         goto error;
1179                 }
1180         } else {
1181                 hash_code = core_case_hash(pres_uri, &event->name, shtable_size);
1182
1183                 lock_get(&subs_htable[hash_code].lock);
1184
1185                 s = subs_htable[hash_code].entries;
1186
1187                 while(s->next) {
1188                         s = s->next;
1189
1190                         printf_subs(s);
1191
1192                         if(s->expires < (int)time(NULL)) {
1193                                 LM_DBG("expired subs\n");
1194                                 continue;
1195                         }
1196
1197                         if((!(s->status == ACTIVE_STATUS && s->reason.len == 0
1198                                            && s->event == event && s->pres_uri.len == pres_uri->len
1199                                            && presence_sip_uri_match(&s->pres_uri, pres_uri) == 0))
1200                                         || (sender && sender->len == s->contact.len
1201                                                            && presence_sip_uri_match(sender, &s->contact)
1202                                                                                   == 0))
1203                                 continue;
1204
1205                         s_new = mem_copy_subs(s, PKG_MEM_TYPE);
1206                         if(s_new == NULL) {
1207                                 LM_ERR("copying subs_t structure\n");
1208                                 lock_release(&subs_htable[hash_code].lock);
1209                                 goto error;
1210                         }
1211                         s_new->expires -= (int)time(NULL);
1212                         s_new->next = s_array;
1213                         s_array = s_new;
1214                 }
1215                 lock_release(&subs_htable[hash_code].lock);
1216         }
1217
1218         return s_array;
1219
1220 error:
1221         free_subs_list(s_array, PKG_MEM_TYPE, 0);
1222         return NULL;
1223 }
1224
1225 int publ_notify(presentity_t *p, str pres_uri, str *body, str *offline_etag,
1226                 str *rules_doc)
1227 {
1228         str *notify_body = NULL;
1229         subs_t *subs_array = NULL, *s = NULL;
1230         int ret_code = -1;
1231
1232         subs_array = get_subs_dialog(&pres_uri, p->event, p->sender);
1233         if(subs_array == NULL) {
1234                 LM_DBG("Could not find subs_dialog\n");
1235                 ret_code = 0;
1236                 goto done;
1237         }
1238
1239         /* if the event does not require aggregation - we have the final body */
1240         if(p->event->agg_nbody) {
1241                 notify_body = get_p_notify_body(pres_uri, p->event, offline_etag, NULL);
1242                 if(notify_body == NULL) {
1243                         LM_DBG("Could not get the notify_body\n");
1244                         /* goto error; */
1245                 }
1246         }
1247
1248         s = subs_array;
1249         while(s) {
1250                 s->auth_rules_doc = rules_doc;
1251
1252                 if(notify(s, NULL, notify_body ? notify_body : body, 0,
1253                                    p->event->aux_body_processing)
1254                                 < 0) {
1255                         LM_ERR("Could not send notify for %.*s\n", p->event->name.len,
1256                                         p->event->name.s);
1257                 }
1258
1259                 s = s->next;
1260         }
1261         ret_code = 0;
1262
1263 done:
1264         free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1265         free_notify_body(notify_body, p->event);
1266         return ret_code;
1267 }
1268
1269 int publ_notify_notifier(str pres_uri, pres_ev_t *event)
1270 {
1271         db_key_t query_cols[2], result_cols[3];
1272         db_val_t query_vals[2], *values;
1273         db_row_t *rows;
1274         db1_res_t *result = NULL;
1275         int n_query_cols = 0, n_result_cols = 0;
1276         int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
1277         int i;
1278         int ret = -1;
1279         subs_t subs;
1280         db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
1281
1282         if(pa_db == NULL) {
1283                 LM_ERR("null database connection\n");
1284                 goto error;
1285         }
1286
1287         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
1288                 LM_ERR("use table failed\n");
1289                 goto error;
1290         }
1291
1292         query_cols[n_query_cols] = &str_presentity_uri_col;
1293         query_vals[n_query_cols].type = DB1_STR;
1294         query_vals[n_query_cols].nul = 0;
1295         query_vals[n_query_cols].val.str_val = pres_uri;
1296         n_query_cols++;
1297
1298         query_cols[n_query_cols] = &str_event_col;
1299         query_vals[n_query_cols].type = DB1_STR;
1300         query_vals[n_query_cols].nul = 0;
1301         query_vals[n_query_cols].val.str_val = event->name;
1302         n_query_cols++;
1303
1304         result_cols[r_callid_col = n_result_cols++] = &str_callid_col;
1305         result_cols[r_to_tag_col = n_result_cols++] = &str_to_tag_col;
1306         result_cols[r_from_tag_col = n_result_cols++] = &str_from_tag_col;
1307
1308         if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
1309                            n_result_cols, 0, &result)
1310                         < 0) {
1311                 LM_ERR("Can't query db\n");
1312                 goto error;
1313         }
1314
1315         if(result == NULL) {
1316                 LM_ERR("bad result\n");
1317                 goto error;
1318         }
1319
1320         rows = RES_ROWS(result);
1321         for(i = 0; i < RES_ROW_N(result); i++) {
1322                 values = ROW_VALUES(&rows[i]);
1323
1324                 subs.callid.s = (char *)VAL_STRING(&values[r_callid_col]);
1325                 subs.callid.len = strlen(subs.callid.s);
1326                 subs.to_tag.s = (char *)VAL_STRING(&values[r_to_tag_col]);
1327                 subs.to_tag.len = strlen(subs.to_tag.s);
1328                 subs.from_tag.s = (char *)VAL_STRING(&values[r_from_tag_col]);
1329                 subs.from_tag.len = strlen(subs.from_tag.s);
1330
1331                 set_updated(&subs);
1332         }
1333
1334         ret = RES_ROW_N(result);
1335
1336 error:
1337         if(result)
1338                 pa_dbf.free_result(pa_db, result);
1339
1340         return ret;
1341 }
1342
1343 int query_db_notify(str *pres_uri, pres_ev_t *event, subs_t *watcher_subs)
1344 {
1345         subs_t *subs_array = NULL, *s = NULL;
1346         str *notify_body = NULL, *aux_body = NULL;
1347         int ret_code = -1;
1348
1349         subs_array = get_subs_dialog(pres_uri, event, NULL);
1350         if(subs_array == NULL) {
1351                 LM_DBG("Could not get subscription dialog\n");
1352                 ret_code = 1;
1353                 goto done;
1354         }
1355
1356         s = subs_array;
1357
1358         if(pres_notifier_processes > 0) {
1359                 while(s) {
1360                         set_updated(s);
1361                         s = s->next;
1362                 }
1363         } else {
1364                 if(event->type & PUBL_TYPE)
1365                         notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
1366
1367                 while(s) {
1368
1369                         if(event->aux_body_processing) {
1370                                 aux_body = event->aux_body_processing(s, notify_body);
1371                         }
1372
1373                         if(notify(s, watcher_subs, aux_body ? aux_body : notify_body, 0, 0)
1374                                         < 0) {
1375                                 LM_ERR("Could not send notify for [event]=%.*s\n",
1376                                                 event->name.len, event->name.s);
1377                                 goto done;
1378                         }
1379
1380                         if(aux_body != NULL) {
1381                                 if(aux_body->s) {
1382                                         event->aux_free_body(aux_body->s);
1383                                 }
1384                                 pkg_free(aux_body);
1385                         }
1386                         s = s->next;
1387                 }
1388         }
1389
1390         ret_code = 1;
1391
1392 done:
1393         free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1394         free_notify_body(notify_body, event);
1395
1396         return ret_code;
1397 }
1398
1399 int send_notify_request(
1400                 subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body)
1401 {
1402         dlg_t *td = NULL;
1403         str met = {"NOTIFY", 6};
1404         str str_hdr = {0, 0};
1405         str *notify_body = NULL;
1406         int result = 0;
1407         subs_t *cb_param = NULL;
1408         str *final_body = NULL;
1409         uac_req_t uac_r;
1410         str *aux_body = NULL;
1411         subs_t *backup_subs = NULL;
1412
1413         LM_DBG("dialog info:\n");
1414         printf_subs(subs);
1415
1416         /* getting the status of the subscription */
1417
1418         if(force_null_body) {
1419                 goto jump_over_body;
1420         }
1421
1422         if(n_body != NULL && subs->status == ACTIVE_STATUS) {
1423                 if(subs->event->req_auth) {
1424
1425                         if(subs->auth_rules_doc && subs->event->apply_auth_nbody) {
1426                                 if(subs->event->apply_auth_nbody(n_body, subs, &notify_body)
1427                                                 < 0) {
1428                                         LM_ERR("in function apply_auth_nbody\n");
1429                                         goto error;
1430                                 }
1431                         }
1432                         if(notify_body == NULL)
1433                                 notify_body = n_body;
1434                 } else
1435                         notify_body = n_body;
1436         } else {
1437                 if(subs->status == TERMINATED_STATUS
1438                                 || subs->status == PENDING_STATUS) {
1439                         LM_DBG("state terminated or pending- notify body NULL\n");
1440                         notify_body = NULL;
1441                 } else {
1442                         if(subs->event->type & WINFO_TYPE) {
1443                                 notify_body = get_wi_notify_body(subs, watcher_subs);
1444                                 if(notify_body == NULL) {
1445                                         LM_DBG("Could not get notify_body\n");
1446                                         goto error;
1447                                 }
1448                         } else {
1449                                 notify_body = get_p_notify_body(subs->pres_uri, subs->event,
1450                                                 NULL, (subs->contact.s) ? &subs->contact : NULL);
1451                                 if(notify_body == NULL || notify_body->s == NULL) {
1452                                         LM_DBG("Could not get the notify_body\n");
1453                                 } else {
1454                                         /* call aux_body_processing if exists */
1455                                         if(subs->event->aux_body_processing) {
1456                                                 aux_body = subs->event->aux_body_processing(
1457                                                                 subs, notify_body);
1458                                                 if(aux_body) {
1459                                                         free_notify_body(notify_body, subs->event);
1460                                                         notify_body = aux_body;
1461                                                 }
1462                                         }
1463
1464                                         /* apply authorization rules if exists */
1465                                         if(subs->event->req_auth) {
1466                                                 if(subs->auth_rules_doc && subs->event->apply_auth_nbody
1467                                                                 && subs->event->apply_auth_nbody(
1468                                                                                    notify_body, subs, &final_body)
1469                                                                                    < 0) {
1470                                                         LM_ERR("in function apply_auth\n");
1471                                                         goto error;
1472                                                 }
1473                                                 if(final_body) {
1474                                                         xmlFree(notify_body->s);
1475                                                         pkg_free(notify_body);
1476                                                         notify_body = final_body;
1477                                                 }
1478                                         }
1479                                 }
1480                         }
1481                 }
1482         }
1483
1484 jump_over_body:
1485
1486         if(subs->expires <= 0) {
1487                 subs->expires = 0;
1488                 subs->status = TERMINATED_STATUS;
1489                 subs->reason.s = "timeout";
1490                 subs->reason.len = 7;
1491         }
1492
1493         /* build extra headers */
1494         if(build_str_hdr(subs, notify_body ? 1 : 0, &str_hdr) < 0) {
1495                 LM_ERR("while building headers\n");
1496                 goto error;
1497         }
1498         LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
1499
1500         /* construct the dlg_t structure */
1501         td = ps_build_dlg_t(subs);
1502         if(td == NULL) {
1503                 LM_ERR("while building dlg_t structure\n");
1504                 goto error;
1505         }
1506
1507         LM_DBG("expires %d status %d\n", subs->expires, subs->status);
1508         cb_param = mem_copy_subs(subs, SHM_MEM_TYPE);
1509
1510         if(_pres_subs_mode==1) {
1511                 backup_subs = _pres_subs_last_sub;
1512                 _pres_subs_last_sub = subs;
1513         }
1514
1515         set_uac_req(&uac_r, &met, &str_hdr, notify_body, td, TMCB_LOCAL_COMPLETED,
1516                         p_tm_callback, (void *)cb_param);
1517         result = tmb.t_request_within(&uac_r);
1518         if(_pres_subs_mode==1) {
1519                 _pres_subs_last_sub = backup_subs;
1520         }
1521         if(result < 0) {
1522                 LM_ERR("in function tmb.t_request_within\n");
1523                 if(cb_param)
1524                         shm_free(cb_param);
1525                 goto error;
1526         }
1527
1528         LM_GEN2(pres_local_log_facility, pres_local_log_level,
1529                         "NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s : %.*s\n",
1530                         td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
1531                         td->hooks.next_hop->s, td->loc_uri.len, td->loc_uri.s,
1532                         subs->event->name.len, subs->event->name.s, subs->callid.len,
1533                         subs->callid.s);
1534
1535         ps_free_tm_dlg(td);
1536
1537         if(str_hdr.s)
1538                 pkg_free(str_hdr.s);
1539
1540         if((int)(long)n_body != (int)(long)notify_body)
1541                 free_notify_body(notify_body, subs->event);
1542
1543         return 0;
1544
1545 error:
1546         ps_free_tm_dlg(td);
1547         if(str_hdr.s != NULL)
1548                 pkg_free(str_hdr.s);
1549         if((int)(long)n_body != (int)(long)notify_body) {
1550                 if(notify_body != NULL) {
1551                         if(notify_body->s != NULL) {
1552                                 if(subs->event->type & WINFO_TYPE)
1553                                         xmlFree(notify_body->s);
1554                                 else if(subs->event->apply_auth_nbody == NULL
1555                                                 && subs->event->agg_nbody == NULL)
1556                                         pkg_free(notify_body->s);
1557                                 else
1558                                         subs->event->free_body(notify_body->s);
1559                         }
1560                         pkg_free(notify_body);
1561                 }
1562         }
1563         return -1;
1564 }
1565
1566
1567 int notify(subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body,
1568                 aux_body_processing_t *aux_body_processing)
1569 {
1570
1571         str *aux_body = NULL;
1572
1573         /* update first in hash table and the send Notify */
1574         if(subs->expires != 0 && subs->status != TERMINATED_STATUS) {
1575                 unsigned int hash_code;
1576                 hash_code = core_case_hash(
1577                                 &subs->pres_uri, &subs->event->name, shtable_size);
1578
1579                 /* if subscriptions are held also in memory, update the subscription hashtable */
1580                 if(pres_subs_dbmode != DB_ONLY) {
1581                         if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0) {
1582                                 /* subscriptions are held only in memory, and hashtable update failed */
1583                                 LM_ERR("updating subscription record in hash table\n");
1584                                 return -1;
1585                         }
1586                 }
1587                 /* if DB_ONLY mode or WRITE_THROUGH update in database */
1588                 if(subs->recv_event != PRES_SUBSCRIBE_RECV
1589                                 && ((pres_subs_dbmode == DB_ONLY && pres_notifier_processes == 0)
1590                                                    || pres_subs_dbmode == WRITE_THROUGH)) {
1591                         LM_DBG("updating subscription to database\n");
1592                         if(update_subs_db(subs, LOCAL_TYPE) < 0) {
1593                                 LM_ERR("updating subscription in database\n");
1594                                 return -1;
1595                         }
1596                 }
1597         }
1598
1599         if(subs->reason.s && subs->status == ACTIVE_STATUS && subs->reason.len == 12
1600                         && strncmp(subs->reason.s, "polite-block", 12) == 0) {
1601                 force_null_body = 1;
1602         }
1603
1604         if(!force_null_body && aux_body_processing) {
1605                 aux_body = aux_body_processing(subs, n_body);
1606         }
1607
1608         if(send_notify_request(subs, watcher_subs, aux_body ? aux_body : n_body,
1609                            force_null_body)
1610                         < 0) {
1611                 LM_ERR("sending Notify not successful\n");
1612                 if(aux_body != NULL) {
1613                         if(aux_body->s) {
1614                                 subs->event->aux_free_body(aux_body->s);
1615                         }
1616                         pkg_free(aux_body);
1617                 }
1618                 return -1;
1619         }
1620
1621         if(aux_body != NULL) {
1622                 if(aux_body->s) {
1623                         subs->event->aux_free_body(aux_body->s);
1624                 }
1625                 pkg_free(aux_body);
1626         }
1627         return 0;
1628 }
1629
1630 static sip_msg_t *_pres_subs_notify_reply_msg = NULL;
1631 static int _pres_subs_notify_reply_code = 0;
1632
1633 int pv_parse_notify_reply_var_name(pv_spec_p sp, str *in)
1634 {
1635         pv_spec_t *pv = NULL;
1636         if(in->s == NULL || in->len <= 0)
1637                 return -1;
1638         pv = (pv_spec_t *)pkg_malloc(sizeof(pv_spec_t));
1639         if(pv == NULL)
1640                 return -1;
1641         memset(pv, 0, sizeof(pv_spec_t));
1642         if(pv_parse_spec(in, pv) == NULL)
1643                 goto error;
1644         sp->pvp.pvn.u.dname = (void *)pv;
1645         sp->pvp.pvn.type = PV_NAME_PVAR;
1646         return 0;
1647
1648 error:
1649         LM_ERR("invalid pv name [%.*s]\n", in->len, in->s);
1650         if(pv != NULL)
1651                 pkg_free(pv);
1652         return -1;
1653 }
1654
1655 int pv_get_notify_reply(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
1656 {
1657         pv_spec_t *pv = NULL;
1658
1659         if(msg == NULL)
1660                 return 1;
1661
1662         pv = (pv_spec_t *)param->pvn.u.dname;
1663         if(pv == NULL)
1664                 return pv_get_null(msg, param, res);
1665
1666         return pv_get_spec_value(_pres_subs_notify_reply_msg, pv, res);
1667 }
1668
1669 #define FAKED_SIP_408_MSG_FORMAT                                  \
1670         "SIP/2.0 408 TIMEOUT\r\nVia: SIP/2.0/UDP 127.0.0.1\r\nFrom: " \
1671         "invalid;\r\nTo: invalid\r\nCall-ID: invalid\r\nCSeq: 1 "     \
1672         "TIMEOUT\r\nContent-Length: 0\r\n\r\n"
1673 static sip_msg_t *_faked_msg = NULL;
1674
1675 sip_msg_t *faked_msg()
1676 {
1677         if(_faked_msg == NULL) {
1678                 _faked_msg = pkg_malloc(sizeof(sip_msg_t));
1679                 if(likely(build_sip_msg_from_buf(_faked_msg, FAKED_SIP_408_MSG_FORMAT,
1680                                                   strlen(FAKED_SIP_408_MSG_FORMAT), inc_msg_no())
1681                                    < 0)) {
1682                         LM_ERR("failed to parse msg buffer\n");
1683                         return NULL;
1684                 }
1685         }
1686         return _faked_msg;
1687 }
1688
1689 void run_notify_reply_event(struct cell *t, struct tmcb_params *ps)
1690 {
1691         int backup_route_type;
1692         subs_t *backup_subs = NULL;
1693         sip_msg_t msg;
1694
1695         if(goto_on_notify_reply == -1)
1696                 return;
1697
1698         if(likely(build_sip_msg_from_buf(&msg, t->uac->request.buffer,
1699                                           t->uac->request.buffer_len, inc_msg_no())
1700                            < 0)) {
1701                 LM_ERR("failed to parse msg buffer\n");
1702                 return;
1703         }
1704
1705         _pres_subs_notify_reply_code = ps->code;
1706         if(ps->code == 408 || ps->rpl == NULL) {
1707                 _pres_subs_notify_reply_msg = faked_msg();
1708         } else {
1709                 _pres_subs_notify_reply_msg = ps->rpl;
1710         }
1711
1712         if(_pres_subs_mode==1) {
1713                 backup_subs = _pres_subs_last_sub;
1714                 _pres_subs_last_sub = mem_copy_subs((subs_t *)(*ps->param), PKG_MEM_TYPE);
1715         }
1716
1717         backup_route_type = get_route_type();
1718         set_route_type(LOCAL_ROUTE);
1719         run_top_route(event_rt.rlist[goto_on_notify_reply], &msg, 0);
1720         set_route_type(backup_route_type);
1721
1722         _pres_subs_notify_reply_msg = NULL;
1723         _pres_subs_notify_reply_code = 0;
1724         if(_pres_subs_mode==1) {
1725                 pkg_free(_pres_subs_last_sub);
1726                 _pres_subs_last_sub = backup_subs;
1727         }
1728         free_sip_msg(&msg);
1729 }
1730
1731 int pres_get_delete_sub(void)
1732 {
1733         sr_xavp_t *vavp = NULL;
1734         str vname = str_init("delete_subscription");
1735
1736         if(pres_xavp_cfg.s == NULL || pres_xavp_cfg.len <= 0) {
1737                 return 0;
1738         }
1739
1740         vavp = xavp_get_child_with_ival(&pres_xavp_cfg, &vname);
1741         if(vavp != NULL) {
1742                 return (int)vavp->val.v.i;
1743         }
1744
1745         return 0;
1746 }
1747
1748 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
1749 {
1750         subs_t *subs;
1751
1752         if(ps->param == NULL || *ps->param == NULL) {
1753                 LM_ERR("weird shit happening\n");
1754                 if(ps->param != NULL && *ps->param != NULL)
1755                         shm_free((subs_t *)(*ps->param));
1756                 return;
1757         }
1758
1759         subs = (subs_t *)(*ps->param);
1760         LM_DBG("completed with status %d [to_tag:%.*s]\n", ps->code,
1761                         subs->to_tag.len, subs->to_tag.s);
1762
1763         run_notify_reply_event(t, ps);
1764
1765         if(ps->code == 404 || ps->code == 481
1766                         || (ps->code == 408 && pres_timeout_rm_subs
1767                                            && subs->status != TERMINATED_STATUS)
1768                         || pres_get_delete_sub()) {
1769                 delete_subs(&subs->pres_uri, &subs->event->name, &subs->to_tag,
1770                                 &subs->from_tag, &subs->callid);
1771         }
1772
1773         shm_free(subs);
1774 }
1775
1776 void free_cbparam(c_back_param *cb_param)
1777 {
1778         if(cb_param != NULL)
1779                 shm_free(cb_param);
1780 }
1781
1782 c_back_param *shm_dup_cbparam(subs_t *subs)
1783 {
1784         int size;
1785         c_back_param *cb_param = NULL;
1786
1787         size = sizeof(c_back_param) + subs->pres_uri.len + subs->event->name.len
1788                    + subs->to_tag.len + subs->from_tag.len + subs->callid.len;
1789
1790         cb_param = (c_back_param *)shm_malloc(size);
1791         LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len, subs->event->name.len,
1792                         subs->to_tag.len);
1793         if(cb_param == NULL) {
1794                 LM_ERR("no more shared memory\n");
1795                 return NULL;
1796         }
1797         memset(cb_param, 0, size);
1798
1799         cb_param->pres_uri.s = (char *)cb_param + sizeof(c_back_param);
1800         memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
1801         cb_param->pres_uri.len = subs->pres_uri.len;
1802         cb_param->ev_name.s =
1803                         (char *)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
1804         memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
1805         cb_param->ev_name.len = subs->event->name.len;
1806         cb_param->to_tag.s = (char *)(cb_param->ev_name.s) + cb_param->ev_name.len;
1807         memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
1808         cb_param->to_tag.len = subs->to_tag.len;
1809
1810         cb_param->from_tag.s = (char *)(cb_param->to_tag.s) + cb_param->to_tag.len;
1811         memcpy(cb_param->from_tag.s, subs->from_tag.s, subs->from_tag.len);
1812         cb_param->from_tag.len = subs->from_tag.len;
1813
1814         cb_param->callid.s =
1815                         (char *)(cb_param->from_tag.s) + cb_param->from_tag.len;
1816         memcpy(cb_param->callid.s, subs->callid.s, subs->callid.len);
1817         cb_param->callid.len = subs->callid.len;
1818
1819         return cb_param;
1820 }
1821
1822
1823 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
1824                 str event, int STATE_FLAG)
1825 {
1826         xmlDocPtr doc = NULL;
1827         xmlNodePtr root_node = NULL, node = NULL;
1828         xmlNodePtr w_list_node = NULL;
1829         char content[200];
1830         str *body = NULL;
1831         char *res = NULL;
1832         watcher_t *w;
1833
1834         LIBXML_TEST_VERSION;
1835
1836         doc = xmlNewDoc(BAD_CAST "1.0");
1837         root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
1838         xmlDocSetRootElement(doc, root_node);
1839
1840         xmlNewProp(root_node, BAD_CAST "xmlns",
1841                         BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
1842         xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version);
1843
1844         if(STATE_FLAG & FULL_STATE_FLAG) {
1845                 if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL) {
1846                         LM_ERR("while adding new attribute\n");
1847                         goto error;
1848                 }
1849         } else {
1850                 if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "partial")
1851                                 == NULL) {
1852                         LM_ERR("while adding new attribute\n");
1853                         goto error;
1854                 }
1855         }
1856
1857         w_list_node = xmlNewChild(root_node, NULL, BAD_CAST "watcher-list", NULL);
1858         if(w_list_node == NULL) {
1859                 LM_ERR("while adding child\n");
1860                 goto error;
1861         }
1862         res = (char *)pkg_malloc(MAX_unsigned(resource.len, event.len) + 1);
1863         if(res == NULL) {
1864                 ERR_MEM(PKG_MEM_STR);
1865         }
1866         memcpy(res, resource.s, resource.len);
1867         res[resource.len] = '\0';
1868         xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
1869         memcpy(res, event.s, event.len);
1870         res[event.len] = '\0';
1871         xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
1872         pkg_free(res);
1873
1874
1875         w = watchers->next;
1876         while(w) {
1877                 strncpy(content, w->uri.s, w->uri.len);
1878                 content[w->uri.len] = '\0';
1879                 node = xmlNewChild(
1880                                 w_list_node, NULL, BAD_CAST "watcher", BAD_CAST content);
1881                 if(node == NULL) {
1882                         LM_ERR("while adding child\n");
1883                         goto error;
1884                 }
1885                 if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s) == NULL) {
1886                         LM_ERR("while adding new attribute\n");
1887                         goto error;
1888                 }
1889
1890                 if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe") == NULL) {
1891                         LM_ERR("while adding new attribute\n");
1892                         goto error;
1893                 }
1894
1895                 if(xmlNewProp(
1896                                    node, BAD_CAST "status", BAD_CAST get_status_str(w->status))
1897                                 == NULL) {
1898                         LM_ERR("while adding new attribute\n");
1899                         goto error;
1900                 }
1901                 w = w->next;
1902         }
1903         body = (str *)pkg_malloc(sizeof(str));
1904         if(body == NULL) {
1905                 ERR_MEM(PKG_MEM_STR);
1906         }
1907         memset(body, 0, sizeof(str));
1908
1909         xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&body->s, &body->len, 1);
1910
1911         xmlFreeDoc(doc);
1912
1913         xmlCleanupParser();
1914
1915         xmlMemoryDump();
1916
1917         return body;
1918
1919 error:
1920         if(doc)
1921                 xmlFreeDoc(doc);
1922         return NULL;
1923 }
1924
1925 int watcher_found_in_list(watcher_t *watchers, str wuri)
1926 {
1927         watcher_t *w;
1928
1929         w = watchers->next;
1930
1931         while(w) {
1932                 if(w->uri.len == wuri.len
1933                                 && presence_sip_uri_match(&w->uri, &wuri) == 0)
1934                         return 1;
1935                 w = w->next;
1936         }
1937
1938         return 0;
1939 }
1940
1941 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
1942 {
1943         watcher_t *w;
1944         db_key_t query_cols[3];
1945         db_val_t query_vals[3];
1946         db_key_t result_cols[2];
1947         db1_res_t *result = NULL;
1948         db_row_t *row = NULL;
1949         db_val_t *row_vals;
1950         int n_result_cols = 0;
1951         int n_query_cols = 0;
1952         int wuser_col, wdomain_col;
1953         str wuser, wdomain, wuri;
1954         int i;
1955
1956         /* select from watchers table the users that have subscribed
1957          * to the presentity and have status pending */
1958
1959         query_cols[n_query_cols] = &str_presentity_uri_col;
1960         query_vals[n_query_cols].type = DB1_STR;
1961         query_vals[n_query_cols].nul = 0;
1962         query_vals[n_query_cols].val.str_val = pres_uri;
1963         n_query_cols++;
1964
1965         query_cols[n_query_cols] = &str_event_col;
1966         query_vals[n_query_cols].type = DB1_STR;
1967         query_vals[n_query_cols].nul = 0;
1968         query_vals[n_query_cols].val.str_val = event;
1969         n_query_cols++;
1970
1971         query_cols[n_query_cols] = &str_status_col;
1972         query_vals[n_query_cols].type = DB1_INT;
1973         query_vals[n_query_cols].nul = 0;
1974         query_vals[n_query_cols].val.int_val = PENDING_STATUS;
1975         n_query_cols++;
1976
1977         result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
1978         result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
1979
1980         if(pa_dbf.use_table(pa_db, &watchers_table) < 0) {
1981                 LM_ERR("sql use table 'watchers_table' failed\n");
1982                 return -1;
1983         }
1984
1985         if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
1986                            n_result_cols, 0, &result)
1987                         < 0) {
1988                 LM_ERR("failed to query %.*s table\n", watchers_table.len,
1989                                 watchers_table.s);
1990                 if(result)
1991                         pa_dbf.free_result(pa_db, result);
1992                 return -1;
1993         }
1994
1995         if(result == NULL) {
1996                 LM_ERR("mysql query failed - null result\n");
1997                 return -1;
1998         }
1999
2000         if(result->n <= 0) {
2001                 LM_DBG("The query returned no result\n");
2002                 pa_dbf.free_result(pa_db, result);
2003                 return 0;
2004         }
2005
2006         for(i = 0; i < result->n; i++) {
2007                 row = &result->rows[i];
2008                 row_vals = ROW_VALUES(row);
2009
2010                 wuser.s = (char *)row_vals[wuser_col].val.string_val;
2011                 wuser.len = strlen(wuser.s);
2012
2013                 wdomain.s = (char *)row_vals[wdomain_col].val.string_val;
2014                 wdomain.len = strlen(wdomain.s);
2015
2016                 if(uandd_to_uri(wuser, wdomain, &wuri) < 0) {
2017                         LM_ERR("creating uri from username and domain\n");
2018                         goto error;
2019                 }
2020
2021                 if(watcher_found_in_list(watchers, wuri)) {
2022                         pkg_free(wuri.s);
2023                         continue;
2024                 }
2025
2026                 w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2027                 if(w == NULL) {
2028                         pkg_free(wuri.s);
2029                         ERR_MEM(PKG_MEM_STR);
2030                 }
2031                 memset(w, 0, sizeof(watcher_t));
2032
2033                 w->status = WAITING_STATUS;
2034                 w->uri = wuri;
2035                 w->id.s = (char *)pkg_malloc(w->uri.len * 2 + 1);
2036                 if(w->id.s == NULL) {
2037                         pkg_free(w->uri.s);
2038                         pkg_free(w);
2039                         ERR_MEM(PKG_MEM_STR);
2040                 }
2041
2042                 to64frombits((unsigned char *)w->id.s, (const unsigned char *)w->uri.s,
2043                                 w->uri.len);
2044                 w->id.len = strlen(w->id.s);
2045                 w->event = event;
2046
2047                 w->next = watchers->next;
2048                 watchers->next = w;
2049         }
2050
2051         pa_dbf.free_result(pa_db, result);
2052         return 0;
2053
2054 error:
2055         if(result)
2056                 pa_dbf.free_result(pa_db, result);
2057         return -1;
2058 }
2059
2060 #define EXTRACT_STRING(strng, chars)                       \
2061         do {                                                   \
2062                 strng.s = (char *)chars;                           \
2063                 strng.len = strng.s == NULL ? 0 : strlen(strng.s); \
2064         } while(0);
2065
2066 static int unset_watchers_updated_winfo(str *pres_uri)
2067 {
2068         db_key_t query_cols[3], result_cols[1], update_cols[1];
2069         db_val_t query_vals[3], update_vals[1];
2070         db_op_t query_ops[2];
2071         db1_res_t *result = NULL;
2072         int n_query_cols = 0;
2073         int ret = -1;
2074         str winfo = str_init("presence.winfo");
2075         db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2076
2077         /* If this is the only presence.winfo dialog awaiting
2078            update for this presentity reset all of the watchers
2079            updated_winfo fields. */
2080
2081         query_cols[n_query_cols] = &str_presentity_uri_col;
2082         query_vals[n_query_cols].type = DB1_STR;
2083         query_vals[n_query_cols].nul = 0;
2084         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2085         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2086         n_query_cols++;
2087
2088         query_cols[n_query_cols] = &str_event_col;
2089         query_vals[n_query_cols].type = DB1_STR;
2090         query_vals[n_query_cols].nul = 0;
2091         query_vals[n_query_cols].val.str_val = winfo;
2092         n_query_cols++;
2093
2094         query_cols[n_query_cols] = &str_updated_col;
2095         query_vals[n_query_cols].type = DB1_INT;
2096         query_vals[n_query_cols].nul = 0;
2097         query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2098         n_query_cols++;
2099
2100         result_cols[0] = &str_id_col;
2101
2102         update_cols[0] = &str_updated_winfo_col;
2103         update_vals[0].type = DB1_INT;
2104         update_vals[0].nul = 0;
2105         update_vals[0].val.int_val = NO_UPDATE_TYPE;
2106
2107         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2108                 LM_ERR("use table failed\n");
2109                 goto error;
2110         }
2111
2112         if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols, 1,
2113                            0, &result)
2114                         < 0) {
2115                 LM_ERR("in sql query\n");
2116                 goto error;
2117         }
2118
2119         if(result == NULL) {
2120                 LM_ERR("bad result\n");
2121                 goto error;
2122         }
2123
2124         if(RES_ROW_N(result) <= 0) {
2125                 query_ops[0] = OP_EQ;
2126                 query_ops[1] = OP_NEQ;
2127
2128                 if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2129                                    update_vals, 2, 1)
2130                                 < 0) {
2131                         LM_ERR("in sql query\n");
2132                         goto error;
2133                 }
2134
2135                 if(pa_dbf.affected_rows)
2136                         ret = pa_dbf.affected_rows(pa_db);
2137                 else
2138                         ret = 0;
2139         } else
2140                 ret = 0;
2141
2142 error:
2143         if(result)
2144                 pa_dbf.free_result(pa_db, result);
2145         return ret;
2146 }
2147
2148 static int dialogs_awaiting_update(str *pres_uri, str event)
2149 {
2150         db_key_t query_cols[3], result_cols[1];
2151         db_val_t query_vals[3];
2152         db_op_t query_ops[3];
2153         db1_res_t *result = NULL;
2154         int n_query_cols = 0;
2155         int ret = -1;
2156         db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2157
2158         query_cols[n_query_cols] = &str_presentity_uri_col;
2159         query_vals[n_query_cols].type = DB1_STR;
2160         query_vals[n_query_cols].nul = 0;
2161         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2162         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2163         query_ops[n_query_cols] = OP_EQ;
2164         n_query_cols++;
2165
2166         query_cols[n_query_cols] = &str_event_col;
2167         query_vals[n_query_cols].type = DB1_STR;
2168         query_vals[n_query_cols].nul = 0;
2169         query_vals[n_query_cols].val.str_val = event;
2170         query_ops[n_query_cols] = OP_EQ;
2171         n_query_cols++;
2172
2173         query_cols[n_query_cols] = &str_updated_col;
2174         query_vals[n_query_cols].type = DB1_INT;
2175         query_vals[n_query_cols].nul = 0;
2176         query_vals[n_query_cols].val.int_val = NO_UPDATE_TYPE;
2177         query_ops[n_query_cols] = OP_NEQ;
2178         n_query_cols++;
2179
2180         result_cols[0] = &str_id_col;
2181
2182         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2183                 LM_ERR("use table failed\n");
2184                 goto error;
2185         }
2186
2187         if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2188                            n_query_cols, 1, 0, &result)
2189                         < 0) {
2190                 LM_ERR("in sql query\n");
2191                 goto error;
2192         }
2193
2194         if(result == NULL) {
2195                 LM_ERR("bad result\n");
2196                 goto error;
2197         } else
2198                 ret = RES_ROW_N(result);
2199
2200 error:
2201         if(result)
2202                 pa_dbf.free_result(pa_db, result);
2203         return ret;
2204 }
2205
2206 int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
2207 {
2208         db_key_t query_cols[3], result_cols[3], update_cols[2];
2209         db_val_t query_vals[3], update_vals[2], *values;
2210         db_row_t *rows;
2211         db1_res_t *result = NULL;
2212         int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2213         int callid_col, from_tag_col, to_tag_col;
2214         int i, ret = -1, count;
2215         str callid, from_tag, to_tag;
2216         db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2217
2218         query_cols[n_query_cols] = &str_presentity_uri_col;
2219         query_vals[n_query_cols].type = DB1_STR;
2220         query_vals[n_query_cols].nul = 0;
2221         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2222         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2223         n_query_cols++;
2224
2225         query_cols[n_query_cols] = &str_event_col;
2226         query_vals[n_query_cols].type = DB1_STR;
2227         query_vals[n_query_cols].nul = 0;
2228         query_vals[n_query_cols].val.str_val = event->name;
2229         n_query_cols++;
2230
2231         result_cols[callid_col = n_result_cols++] = &str_callid_col;
2232         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2233         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2234
2235         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2236                 LM_ERR("use table failed\n");
2237                 goto error;
2238         }
2239
2240         if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2241                            n_result_cols, 0, &result)
2242                         < 0) {
2243                 LM_ERR("in sql query\n");
2244                 goto error;
2245         }
2246
2247         if(result == NULL) {
2248                 LM_ERR("bad result\n");
2249                 goto error;
2250         }
2251
2252         if(RES_ROW_N(result) <= 0) {
2253                 ret = 0;
2254                 goto done;
2255         }
2256
2257         rows = RES_ROWS(result);
2258         count = RES_ROW_N(result);
2259         for(i = 0; i < RES_ROW_N(result); i++) {
2260                 values = ROW_VALUES(&rows[i]);
2261
2262                 EXTRACT_STRING(callid, VAL_STRING(&values[callid_col]));
2263                 EXTRACT_STRING(from_tag, VAL_STRING(&values[from_tag_col]));
2264                 EXTRACT_STRING(to_tag, VAL_STRING(&values[to_tag_col]));
2265
2266                 n_query_cols = 0;
2267                 n_update_cols = 0;
2268
2269                 query_cols[n_query_cols] = &str_callid_col;
2270                 query_vals[n_query_cols].type = DB1_STR;
2271                 query_vals[n_query_cols].nul = 0;
2272                 query_vals[n_query_cols].val.str_val = callid;
2273                 n_query_cols++;
2274
2275                 query_cols[n_query_cols] = &str_to_tag_col;
2276                 query_vals[n_query_cols].type = DB1_STR;
2277                 query_vals[n_query_cols].nul = 0;
2278                 query_vals[n_query_cols].val.str_val = to_tag;
2279                 n_query_cols++;
2280
2281                 query_cols[n_query_cols] = &str_from_tag_col;
2282                 query_vals[n_query_cols].type = DB1_STR;
2283                 query_vals[n_query_cols].nul = 0;
2284                 query_vals[n_query_cols].val.str_val = from_tag;
2285                 n_query_cols++;
2286
2287                 update_cols[n_update_cols] = &str_updated_col;
2288                 update_vals[n_update_cols].type = DB1_INT;
2289                 update_vals[n_update_cols].nul = 0;
2290                 update_vals[n_update_cols].val.int_val =
2291                                 core_case_hash(&callid, &from_tag, 0)
2292                                 % (pres_waitn_time * pres_notifier_poll_rate
2293                                                   * pres_notifier_processes);
2294                 n_update_cols++;
2295
2296                 if(full) {
2297                         update_cols[n_update_cols] = &str_updated_winfo_col;
2298                         update_vals[n_update_cols].type = DB1_INT;
2299                         update_vals[n_update_cols].nul = 0;
2300                         update_vals[n_update_cols].val.int_val = UPDATED_TYPE;
2301                         n_update_cols++;
2302                 }
2303
2304                 if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
2305                                    update_vals, n_query_cols, n_update_cols)
2306                                 < 0) {
2307                         LM_ERR("in sql query\n");
2308                         goto error;
2309                 }
2310
2311                 if(pa_dbf.affected_rows)
2312                         if(pa_dbf.affected_rows(pa_db) == 0)
2313                                 count--;
2314         }
2315
2316         ret = count;
2317
2318 done:
2319 error:
2320         if(result)
2321                 pa_dbf.free_result(pa_db, result);
2322
2323         return ret;
2324 }
2325
2326 int set_updated(subs_t *sub)
2327 {
2328         db_key_t query_cols[3], update_cols[1];
2329         db_val_t query_vals[3], update_vals[1];
2330         int n_query_cols = 0;
2331
2332         query_cols[n_query_cols] = &str_callid_col;
2333         query_vals[n_query_cols].type = DB1_STR;
2334         query_vals[n_query_cols].nul = 0;
2335         query_vals[n_query_cols].val.str_val = sub->callid;
2336         n_query_cols++;
2337
2338         query_cols[n_query_cols] = &str_to_tag_col;
2339         query_vals[n_query_cols].type = DB1_STR;
2340         query_vals[n_query_cols].nul = 0;
2341         query_vals[n_query_cols].val.str_val = sub->to_tag;
2342         n_query_cols++;
2343
2344         query_cols[n_query_cols] = &str_from_tag_col;
2345         query_vals[n_query_cols].type = DB1_STR;
2346         query_vals[n_query_cols].nul = 0;
2347         query_vals[n_query_cols].val.str_val = sub->from_tag;
2348         n_query_cols++;
2349
2350         update_cols[0] = &str_updated_col;
2351         update_vals[0].type = DB1_INT;
2352         update_vals[0].nul = 0;
2353         update_vals[0].val.int_val = core_case_hash(&sub->callid, &sub->from_tag, 0)
2354                                                                  % (pres_waitn_time * pres_notifier_poll_rate
2355                                                                                    * pres_notifier_processes);
2356
2357         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2358                 LM_ERR("use table failed\n");
2359                 return -1;
2360         }
2361
2362         if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols, update_vals,
2363                            n_query_cols, 1)
2364                         < 0) {
2365                 LM_ERR("in sql query\n");
2366                 return -1;
2367         }
2368
2369         if(pa_dbf.affected_rows)
2370                 return pa_dbf.affected_rows(pa_db);
2371         else
2372                 return 0;
2373 }
2374
2375 static watcher_t *build_watchers_list(subs_t *sub)
2376 {
2377         db_key_t query_cols[3], result_cols[4];
2378         db_val_t query_vals[3], *values;
2379         db_row_t *rows;
2380         db1_res_t *result = NULL;
2381         int n_query_cols = 0, n_result_cols = 0;
2382         int wuser_col, wdomain_col, callid_col, status_col;
2383         int i;
2384         subs_t sb;
2385         watcher_t *watchers = NULL;
2386
2387         watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2388         if(watchers == NULL) {
2389                 ERR_MEM(PKG_MEM_STR);
2390         }
2391         memset(watchers, 0, sizeof(watcher_t));
2392
2393         query_cols[n_query_cols] = &str_presentity_uri_col;
2394         query_vals[n_query_cols].type = DB1_STR;
2395         query_vals[n_query_cols].nul = 0;
2396         query_vals[n_query_cols].val.str_val = sub->pres_uri;
2397         n_query_cols++;
2398
2399         query_cols[n_query_cols] = &str_event_col;
2400         query_vals[n_query_cols].type = DB1_STR;
2401         query_vals[n_query_cols].nul = 0;
2402         query_vals[n_query_cols].val.str_val = sub->event->wipeer->name;
2403         n_query_cols++;
2404
2405         query_cols[n_query_cols] = &str_updated_winfo_col;
2406         query_vals[n_query_cols].type = DB1_INT;
2407         query_vals[n_query_cols].nul = 0;
2408         query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2409         n_query_cols++;
2410
2411         result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2412         result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2413         result_cols[callid_col = n_result_cols++] = &str_callid_col;
2414         result_cols[status_col = n_result_cols++] = &str_status_col;
2415
2416         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2417                 LM_ERR("use table failed\n");
2418                 goto error;
2419         }
2420
2421         if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2422                            n_result_cols, 0, &result)
2423                         < 0) {
2424                 LM_ERR("in sql query\n");
2425                 goto error;
2426         }
2427
2428         if(result == NULL) {
2429                 LM_ERR("bad result\n");
2430                 goto error;
2431         }
2432
2433         if(RES_ROW_N(result) <= 0)
2434                 goto done;
2435
2436         rows = RES_ROWS(result);
2437         for(i = 0; i < RES_ROW_N(result); i++) {
2438                 memset(&sb, 0, sizeof(subs_t));
2439                 values = ROW_VALUES(&rows[i]);
2440
2441                 EXTRACT_STRING(sb.watcher_user, VAL_STRING(&values[wuser_col]));
2442                 EXTRACT_STRING(sb.watcher_domain, VAL_STRING(&values[wdomain_col]));
2443                 EXTRACT_STRING(sb.callid, VAL_STRING(&values[callid_col]));
2444                 sb.status = VAL_INT(&values[status_col]);
2445
2446                 sb.event = sub->event->wipeer;
2447
2448                 if(add_watcher_list(&sb, watchers) < 0)
2449                         goto error;
2450         }
2451
2452 done:
2453         pa_dbf.free_result(pa_db, result);
2454         return watchers;
2455
2456 error:
2457         if(result)
2458                 pa_dbf.free_result(pa_db, result);
2459         free_watcher_list(watchers);
2460         return NULL;
2461 }
2462
2463 static int cleanup_missing_dialog(subs_t *sub)
2464 {
2465         int ret = -1, num_other_watchers = 0;
2466
2467         if(sub->event->type & WINFO_TYPE) {
2468                 if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2469                         LM_ERR("resetting updated_winfo flags\n");
2470                         goto error;
2471                 }
2472         } else if(sub->event->type & PUBL_TYPE) {
2473                 if((num_other_watchers = dialogs_awaiting_update(
2474                                         &sub->pres_uri, sub->event->name))
2475                                 < 0) {
2476                         LM_ERR("checking watchers\n");
2477                         goto error;
2478                 } else if(num_other_watchers == 0) {
2479                         if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2480                                 LM_ERR("deleting presentity\n");
2481                                 goto error;
2482                         }
2483                 }
2484         }
2485
2486         ret = 0;
2487
2488 error:
2489         return ret;
2490 }
2491
2492 static int notifier_notify(subs_t *sub, int *updated, int *end_transaction)
2493 {
2494         str *nbody = NULL;
2495         watcher_t *watchers = NULL;
2496         int ret = 0, attempt_delete_presentities = 0;
2497
2498         *updated = 0;
2499
2500         /* Terminating dialog NOTIFY */
2501         if(sub->expires == 0 || sub->status == TERMINATED_STATUS) {
2502                 sub->status = TERMINATED_STATUS;
2503
2504                 if(sub->event->type & WINFO_TYPE) {
2505                         if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2506                                 LM_WARN("resetting updated_winfo flags\n");
2507
2508                                 if(pa_dbf.abort_transaction) {
2509                                         if(pa_dbf.abort_transaction(pa_db) < 0) {
2510                                                 LM_ERR("in abort_transaction\n");
2511                                                 goto error;
2512                                         }
2513                                 }
2514                                 *end_transaction = 0;
2515
2516                                 /* Make sure this gets tried again next time */
2517                                 *updated = 1;
2518                                 goto done;
2519                         }
2520                 } else {
2521                         str winfo = str_init("presence.winfo");
2522                         int num_other_watchers, num_winfos;
2523
2524                         if(sub->event->type & PUBL_TYPE) {
2525                                 if((num_other_watchers = dialogs_awaiting_update(
2526                                                         &sub->pres_uri, sub->event->name))
2527                                                 < 0) {
2528                                         LM_ERR("checking watchers\n");
2529                                         goto error;
2530                                 } else if(num_other_watchers == 0)
2531                                         attempt_delete_presentities = 1;
2532                         }
2533
2534                         if(sub->event->wipeer) {
2535                                 if((num_winfos = dialogs_awaiting_update(&sub->pres_uri, winfo))
2536                                                 < 0) {
2537                                         LM_ERR("checking winfos\n");
2538                                         goto error;
2539                                 }
2540
2541                                 if(sub->updated_winfo == UPDATED_TYPE && num_winfos > 0) {
2542                                         *updated = 1;
2543                                         goto done;
2544                                 }
2545                         }
2546                 }
2547         } else /* Non-terminating dialog */
2548         {
2549                 if(sub->event->type & WINFO_TYPE) /* presence.winfo dialog */
2550                 {
2551                         if(sub->updated_winfo == NO_UPDATE_TYPE) {
2552                                 /* Partial notify if
2553                                    updated_winfo == NO_UPDATE_TYPE */
2554                                 int len = 0;
2555                                 char *version_str = int2str(sub->version, &len);
2556                                 if(version_str == NULL) {
2557                                         LM_ERR("converting int to str\n");
2558                                         goto error;
2559                                 }
2560
2561                                 watchers = build_watchers_list(sub);
2562                                 if(watchers == NULL) {
2563                                         LM_ERR("in build_watchers_list\n");
2564                                         goto error;
2565                                 }
2566
2567                                 nbody = create_winfo_xml(watchers, version_str, sub->pres_uri,
2568                                                 sub->event->wipeer->name, PARTIAL_STATE_FLAG);
2569                                 if(nbody == NULL) {
2570                                         LM_ERR("in create_winfo_xml\n");
2571                                         goto error;
2572                                 }
2573
2574                         } else /* Full presence.winfo NOTIFY */
2575                                 sub->updated_winfo = NO_UPDATE_TYPE;
2576
2577                         if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2578                                 LM_WARN("resetting updated_winfo flags\n");
2579
2580                                 if(pa_dbf.abort_transaction) {
2581                                         if(pa_dbf.abort_transaction(pa_db) < 0) {
2582                                                 LM_ERR("in abort_transaction\n");
2583                                                 goto error;
2584                                         }
2585                                 }
2586                                 *end_transaction = 0;
2587
2588                                 /* Make sure this gets tried again next time */
2589                                 *updated = 1;
2590                                 goto done;
2591                         }
2592
2593                 } else if(sub->event->type & PUBL_TYPE) {
2594                         int num_other_watchers;
2595
2596                         if((num_other_watchers = dialogs_awaiting_update(
2597                                                 &sub->pres_uri, sub->event->name))
2598                                         < 0) {
2599                                 LM_ERR("checking watchers\n");
2600                                 goto error;
2601                         } else if(num_other_watchers == 0)
2602                                 attempt_delete_presentities = 1;
2603                 } else if(!pres_send_fast_notify)
2604                         goto done;
2605         }
2606
2607         if(notify(sub, NULL, nbody, 0, 0) < 0) {
2608                 LM_ERR("could not send notify\n");
2609                 goto error;
2610         }
2611
2612         ret = 1;
2613
2614 done:
2615         if(attempt_delete_presentities) {
2616                 if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2617                         LM_ERR("deleting presentity\n");
2618                         goto error;
2619                 }
2620         }
2621
2622         free_notify_body(nbody, sub->event);
2623         free_watcher_list(watchers);
2624
2625         return ret;
2626
2627 error:
2628         free_notify_body(nbody, sub->event);
2629         free_watcher_list(watchers);
2630
2631         if(pa_dbf.abort_transaction) {
2632                 if(pa_dbf.abort_transaction(pa_db) < 0)
2633                         LM_ERR("in abort_transaction\n");
2634         }
2635         *end_transaction = 0;
2636
2637         return -1;
2638 }
2639
2640 int process_dialogs(int round, int presence_winfo)
2641 {
2642         db_key_t query_cols[3], result_cols[20], update_cols[4];
2643         db_val_t query_vals[3], update_vals[4], *values, *dvalues;
2644         db_op_t query_ops[2];
2645         db_row_t *rows, *drows;
2646         db1_res_t *dialog_list = NULL, *dialog = NULL;
2647         int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2648         int callid_col, to_tag_col, from_tag_col;
2649         int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
2650         int wuser_col, wdomain_col, sockinfo_col, lcontact_col, contact_col;
2651         int rroute_col, event_id_col, reason_col, event_col, lcseq_col;
2652         int rcseq_col, status_col, version_col, updated_winfo_col, expires_col;
2653         int flags_col, user_agent_col;
2654         int i, notify_sent = 0, cached_updated_winfo, ret = -1;
2655         int end_transaction = 0;
2656         subs_t sub;
2657         str ev_sname, winfo = str_init("presence.winfo");
2658         int now = (int)time(NULL);
2659         int updated = 0;
2660         db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2661
2662         query_cols[n_query_cols] = &str_updated_col;
2663         query_vals[n_query_cols].type = DB1_INT;
2664         query_vals[n_query_cols].nul = 0;
2665         query_vals[n_query_cols].val.int_val = round;
2666         query_ops[n_query_cols] = OP_EQ;
2667         n_query_cols++;
2668
2669         query_cols[n_query_cols] = &str_event_col;
2670         query_vals[n_query_cols].type = DB1_STR;
2671         query_vals[n_query_cols].nul = 0;
2672         query_vals[n_query_cols].val.str_val = winfo;
2673         query_ops[n_query_cols] = presence_winfo ? OP_EQ : OP_NEQ;
2674         n_query_cols++;
2675
2676         result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
2677         result_cols[callid_col = n_result_cols++] = &str_callid_col;
2678         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2679         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2680         result_cols[event_col = n_result_cols++] = &str_event_col;
2681
2682         update_cols[n_update_cols] = &str_updated_col;
2683         update_vals[n_update_cols].type = DB1_INT;
2684         update_vals[n_update_cols].nul = 0;
2685         update_vals[n_update_cols].val.int_val = NO_UPDATE_TYPE;
2686         n_update_cols++;
2687
2688         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2689                 LM_ERR("use table failed\n");
2690                 goto error;
2691         }
2692
2693         if(pa_dbf.start_transaction) {
2694                 if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
2695                         LM_ERR("in start_transaction\n");
2696                         goto error;
2697                 }
2698         }
2699
2700         /* Step 1: Find active_watchers that require notification */
2701         if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2702                            n_query_cols, n_result_cols, 0, &dialog_list)
2703                         < 0) {
2704                 LM_ERR("in sql query\n");
2705                 goto error;
2706         }
2707         if(dialog_list == NULL) {
2708                 LM_ERR("bad result\n");
2709                 goto error;
2710         }
2711
2712         if(dialog_list->n <= 0)
2713                 goto done;
2714
2715         /* Step 2: Update the records so they are not notified again */
2716         if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2717                            update_vals, n_query_cols, n_update_cols)
2718                         < 0) {
2719                 LM_ERR("in sql update\n");
2720                 goto error;
2721         }
2722
2723         if(pa_dbf.end_transaction) {
2724                 if(pa_dbf.end_transaction(pa_db) < 0) {
2725                         LM_ERR("in end_transaction\n");
2726                         goto error;
2727                 }
2728         }
2729
2730         /* Step 3: Notify each watcher we found */
2731         rows = RES_ROWS(dialog_list);
2732         for(i = 0; i < RES_ROW_N(dialog_list); i++) {
2733                 n_query_cols = 0;
2734                 n_result_cols = 0;
2735                 n_update_cols = 0;
2736                 memset(&sub, 0, sizeof(subs_t));
2737                 values = ROW_VALUES(&rows[i]);
2738
2739                 EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
2740                 EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
2741                 EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
2742                 EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
2743                 EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
2744                 sub.event = contains_event(&ev_sname, NULL);
2745                 if(sub.event == NULL) {
2746                         LM_ERR("event not found and set to NULL\n");
2747                         goto delete_dialog;
2748                 }
2749
2750                 query_cols[n_query_cols] = &str_callid_col;
2751                 query_vals[n_query_cols].type = DB1_STR;
2752                 query_vals[n_query_cols].nul = 0;
2753                 query_vals[n_query_cols].val.str_val = sub.callid;
2754                 n_query_cols++;
2755
2756                 query_cols[n_query_cols] = &str_to_tag_col;
2757                 query_vals[n_query_cols].type = DB1_STR;
2758                 query_vals[n_query_cols].nul = 0;
2759                 query_vals[n_query_cols].val.str_val = sub.to_tag;
2760                 n_query_cols++;
2761
2762                 query_cols[n_query_cols] = &str_from_tag_col;
2763                 query_vals[n_query_cols].type = DB1_STR;
2764                 query_vals[n_query_cols].nul = 0;
2765                 query_vals[n_query_cols].val.str_val = sub.from_tag;
2766                 n_query_cols++;
2767
2768                 result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
2769                 result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
2770                 result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
2771                 result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
2772                 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2773                 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2774                 result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
2775                 result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
2776                 result_cols[contact_col = n_result_cols++] = &str_contact_col;
2777                 result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
2778                 result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
2779                 result_cols[reason_col = n_result_cols++] = &str_reason_col;
2780                 result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
2781                 result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
2782                 result_cols[status_col = n_result_cols++] = &str_status_col;
2783                 result_cols[version_col = n_result_cols++] = &str_version_col;
2784                 result_cols[updated_winfo_col = n_result_cols++] =
2785                                 &str_updated_winfo_col;
2786                 result_cols[expires_col = n_result_cols++] = &str_expires_col;
2787                 result_cols[flags_col = n_result_cols++] = &str_flags_col;
2788                 result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
2789
2790                 /* Need to redo this here as we might have switched to the
2791                    presentity table during a previous iteration. */
2792                 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2793                         LM_ERR("use table failed\n");
2794                         goto error;
2795                 }
2796
2797                 if(pa_dbf.start_transaction) {
2798                         if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
2799                                 LM_ERR("in start_transaction\n");
2800                                 goto error;
2801                         }
2802                 }
2803                 end_transaction = 1;
2804
2805                 if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2806                                    n_result_cols, 0, &dialog)
2807                                 < 0) {
2808                         LM_ERR("in sql query\n");
2809                         goto error;
2810                 }
2811
2812                 if(dialog == NULL) {
2813                         LM_ERR("bad result\n");
2814                         goto error;
2815                 }
2816
2817                 if(dialog->n <= 0) {
2818                         LM_INFO("record not found - this may be observed in multi-server "
2819                                         "systems\n");
2820                         if(cleanup_missing_dialog(&sub) < 0)
2821                                 LM_ERR("cleaning up after missing record\n");
2822                         goto next_dialog;
2823                 }
2824
2825                 if(dialog->n > 1) {
2826                         LM_ERR("multiple records found for %.*s, ci : %.*s, tt : %.*s, ft "
2827                                    ": %.*s, ev : %.*s\n",
2828                                         sub.pres_uri.len, sub.pres_uri.s, sub.callid.len,
2829                                         sub.callid.s, sub.to_tag.len, sub.to_tag.s,
2830                                         sub.from_tag.len, sub.from_tag.s, ev_sname.len, ev_sname.s);
2831                         goto delete_dialog;
2832                 }
2833
2834                 drows = RES_ROWS(dialog);
2835                 dvalues = ROW_VALUES(drows);
2836
2837                 EXTRACT_STRING(sub.to_user, VAL_STRING(&dvalues[tuser_col]));
2838                 EXTRACT_STRING(sub.to_domain, VAL_STRING(&dvalues[tdomain_col]));
2839                 EXTRACT_STRING(sub.from_user, VAL_STRING(&dvalues[fuser_col]));
2840                 EXTRACT_STRING(sub.from_domain, VAL_STRING(&dvalues[fdomain_col]));
2841                 EXTRACT_STRING(sub.watcher_user, VAL_STRING(&dvalues[wuser_col]));
2842                 EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&dvalues[wdomain_col]));
2843                 EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&dvalues[sockinfo_col]));
2844                 EXTRACT_STRING(sub.local_contact, VAL_STRING(&dvalues[lcontact_col]));
2845                 EXTRACT_STRING(sub.contact, VAL_STRING(&dvalues[contact_col]));
2846                 EXTRACT_STRING(sub.record_route, VAL_STRING(&dvalues[rroute_col]));
2847                 EXTRACT_STRING(sub.event_id, VAL_STRING(&dvalues[event_id_col]));
2848                 EXTRACT_STRING(sub.reason, VAL_STRING(&dvalues[reason_col]));
2849                 EXTRACT_STRING(sub.user_agent, VAL_STRING(&dvalues[user_agent_col]));
2850
2851                 sub.local_cseq = VAL_INT(&dvalues[lcseq_col]) + 1;
2852                 sub.remote_cseq = VAL_INT(&dvalues[rcseq_col]);
2853                 sub.status = VAL_INT(&dvalues[status_col]);
2854                 sub.version = VAL_INT(&dvalues[version_col]) + 1;
2855                 cached_updated_winfo = sub.updated_winfo =
2856                                 VAL_INT(&dvalues[updated_winfo_col]);
2857
2858                 if(VAL_INT(&dvalues[expires_col]) > now + pres_expires_offset)
2859                         sub.expires = VAL_INT(&dvalues[expires_col]) - now;
2860                 else
2861                         sub.expires = 0;
2862                 sub.flags = VAL_INT(&dvalues[flags_col]);
2863
2864                 sub.updated = round;
2865
2866                 if((notify_sent = notifier_notify(&sub, &updated, &end_transaction))
2867                                 < 0) {
2868                         LM_ERR("sending NOTIFY request\n");
2869
2870                         if(cleanup_missing_dialog(&sub) < 0)
2871                                 LM_ERR("cleaning up after error sending NOTIFY"
2872                                            "request\n");
2873
2874                         /* remove the dialog and continue */
2875                         goto delete_dialog;
2876                 }
2877
2878                 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2879                         LM_ERR("use table failed\n");
2880                         goto error;
2881                 }
2882
2883                 if((sub.expires > 0 && sub.status != TERMINATED_STATUS) || updated) {
2884                         if(sub.updated_winfo != cached_updated_winfo) {
2885                                 update_cols[n_update_cols] = &str_updated_winfo_col;
2886                                 update_vals[n_update_cols].type = DB1_INT;
2887                                 update_vals[n_update_cols].nul = 0;
2888                                 update_vals[n_update_cols].val.int_val = sub.updated_winfo;
2889                                 n_update_cols++;
2890                         }
2891
2892                         if(updated) {
2893                                 update_cols[n_update_cols] = &str_updated_col;
2894                                 update_vals[n_update_cols].type = DB1_INT;
2895                                 update_vals[n_update_cols].nul = 0;
2896                                 update_vals[n_update_cols].val.int_val = round;
2897                                 n_update_cols++;
2898                         }
2899
2900                         if(notify_sent) {
2901                                 update_cols[n_update_cols] = &str_local_cseq_col;
2902                                 update_vals[n_update_cols].type = DB1_INT;
2903                                 update_vals[n_update_cols].nul = 0;
2904                                 update_vals[n_update_cols].val.int_val = sub.local_cseq;
2905                                 n_update_cols++;
2906
2907                                 update_cols[n_update_cols] = &str_version_col;
2908                                 update_vals[n_update_cols].type = DB1_INT;
2909                                 update_vals[n_update_cols].nul = 0;
2910                                 update_vals[n_update_cols].val.int_val = sub.version;
2911                                 n_update_cols++;
2912                         }
2913
2914                         if(n_update_cols > 0) {
2915                                 if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
2916                                                    update_vals, n_query_cols, n_update_cols)
2917                                                 < 0) {
2918                                         LM_ERR("in sql update\n");
2919                                         goto error;
2920                                 }
2921                         }
2922
2923                 } else if(notify_sent) {
2924                 delete_dialog:
2925                         if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2926                                 LM_ERR("use table failed\n");
2927                                 goto error;
2928                         }
2929
2930                         if(pa_dbf.delete(pa_db, query_cols, 0, query_vals, n_query_cols)
2931                                         < 0) {
2932                                 LM_ERR("in sql delete");
2933                                 goto error;
2934                         }
2935                 }
2936
2937         next_dialog:
2938                 if(pa_dbf.end_transaction && end_transaction) {
2939                         if(pa_dbf.end_transaction(pa_db) < 0) {
2940                                 LM_ERR("in end_transaction\n");
2941                                 goto error;
2942                         }
2943                 }
2944
2945                 pa_dbf.free_result(pa_db, dialog);
2946                 dialog = NULL;
2947         }
2948
2949 done:
2950         ret = 0;
2951 error:
2952         if(dialog_list)
2953                 pa_dbf.free_result(pa_db, dialog_list);
2954         if(dialog)
2955                 pa_dbf.free_result(pa_db, dialog);
2956
2957         if(pa_dbf.abort_transaction) {
2958                 if(pa_dbf.abort_transaction(pa_db) < 0)
2959                         LM_ERR("in abort_transaction\n");
2960         }
2961
2962         return ret;
2963 }
2964
2965 void pres_timer_send_notify(unsigned int ticks, void *param)
2966 {
2967         int process_num = *((int *)param);
2968         int round =
2969                         subset + (pres_waitn_time * pres_notifier_poll_rate * process_num);
2970
2971         if(++subset > (pres_waitn_time * pres_notifier_poll_rate) - 1)
2972                 subset = 0;
2973
2974         if(process_dialogs(round, 0) < 0) {
2975                 LM_ERR("Handling non presence.winfo dialogs\n");
2976                 return;
2977         }
2978         if(process_dialogs(round, 1) < 0) {
2979                 LM_ERR("Handling presence.winfo dialogs\n");
2980                 return;
2981         }
2982 }