6f7098ea7a5f2012a4879778a9f64dc91f042d1f
[sip-router] / modules_k / presence / subscribe.c
1 /*
2  * $Id$
3  *
4  * presence module - presence server implementation
5  *
6  * Copyright (C) 2006 Voice Sistem S.R.L.
7  *
8  * This file is part of openser, a free SIP serves.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License 
21  * along with this program; if not, write to the Free Software 
22  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  * History:
25  * --------
26  *  2006-08-15  initial version (anca)
27  */
28
29 /*! \file
30  * \brief OpenSER presence module :: Support for SUBSCRIBE handling
31  * \ingroup presence 
32  */
33
34
35 #include "../../ut.h"
36 #include "../../usr_avp.h"
37 #include "../../data_lump_rpl.h"
38 #include "../../parser/parse_expires.h"
39 #include "../../parser/parse_event.h"
40 #include "../../parser/contact/parse_contact.h"
41 #include "presence.h"
42 #include "subscribe.h"
43 #include "utils_func.h"
44 #include "notify.h"
45 #include "../pua/hash.h"
46
47 int get_stored_info(struct sip_msg* msg, subs_t* subs, int* error_ret,
48                 str* reply_str);
49 int get_database_info(struct sip_msg* msg, subs_t* subs, int* error_ret,
50                 str* reply_str);
51 int get_db_subs_auth(subs_t* subs, int* found);
52 int insert_db_subs_auth(subs_t* subs);
53
54 static str su_200_rpl  = str_init("OK");
55 static str pu_481_rpl  = str_init("Subscription does not exist");
56 static str pu_400_rpl  = str_init("Bad request");
57 static str pu_500_rpl  = str_init("Server Internal Error");
58 static str pu_489_rpl  = str_init("Bad Event");
59
60
61 int send_2XX_reply(struct sip_msg * msg, int reply_code, int lexpire,
62                 str *rtag, str* local_contact)
63 {
64         static str hdr_append;
65         
66         hdr_append.s = (char *)pkg_malloc( sizeof(char)*(local_contact->len+ 128));
67         if(hdr_append.s == NULL)
68         {
69                 ERR_MEM(PKG_MEM_STR);
70         }
71         hdr_append.len = sprintf(hdr_append.s, "Expires: %d\r\n", lexpire);     
72         
73         strncpy(hdr_append.s+hdr_append.len ,"Contact: <", 10);
74         hdr_append.len += 10;
75         strncpy(hdr_append.s+hdr_append.len, local_contact->s, local_contact->len);
76         hdr_append.len+= local_contact->len;
77         strncpy(hdr_append.s+hdr_append.len, ">", 1);
78         hdr_append.len += 1;
79         strncpy(hdr_append.s+hdr_append.len, CRLF, CRLF_LEN);
80         hdr_append.len += CRLF_LEN;
81
82         hdr_append.s[hdr_append.len]= '\0';
83         
84         if (add_lump_rpl( msg, hdr_append.s, hdr_append.len, LUMP_RPL_HDR)==0 )
85         {
86                 LM_ERR("unable to add lump_rl\n");
87                 goto error;
88         }
89
90         if( slb.reply_dlg( msg, reply_code, &su_200_rpl, rtag)== -1)
91         {
92                 LM_ERR("sending reply\n");
93                 goto error;
94         }
95         
96         pkg_free(hdr_append.s);
97         return 0;
98
99 error:
100
101         pkg_free(hdr_append.s);
102         return -1;
103 }
104
105
106 int delete_db_subs(str pres_uri, str ev_stored_name, str to_tag)
107 {
108         db_key_t query_cols[5];
109         db_val_t query_vals[5];
110         int n_query_cols= 0;
111
112         query_cols[n_query_cols] = &str_presentity_uri_col;
113         query_vals[n_query_cols].type = DB_STR;
114         query_vals[n_query_cols].nul = 0;
115         query_vals[n_query_cols].val.str_val = pres_uri;
116         n_query_cols++;
117
118         query_cols[n_query_cols] = &str_event_col;
119         query_vals[n_query_cols].type = DB_STR;
120         query_vals[n_query_cols].nul = 0;
121         query_vals[n_query_cols].val.str_val = ev_stored_name;
122         n_query_cols++;
123
124         query_cols[n_query_cols] = &str_to_tag_col;
125         query_vals[n_query_cols].type = DB_STR;
126         query_vals[n_query_cols].nul = 0;
127         query_vals[n_query_cols].val.str_val = to_tag;
128         n_query_cols++;
129         
130         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
131         {
132                 LM_ERR("in use table sql operation\n");
133                 return -1;
134         }
135
136         if(pa_dbf.delete(pa_db, query_cols, 0, query_vals,
137                                 n_query_cols)< 0 )
138         {
139                 LM_ERR("sql delete failed\n");
140                 return -1;
141         }
142
143         return 0;
144 }
145
146 int update_subs_db(subs_t* subs, int type)
147 {
148         db_key_t query_cols[22], update_keys[7];
149         db_val_t query_vals[22], update_vals[7];
150         int n_update_cols= 0;
151         int n_query_cols = 0;
152
153         query_cols[n_query_cols] = &str_presentity_uri_col;
154         query_vals[n_query_cols].type = DB_STR;
155         query_vals[n_query_cols].nul = 0;
156         query_vals[n_query_cols].val.str_val = subs->pres_uri;
157         n_query_cols++;
158         
159         query_cols[n_query_cols] = &str_watcher_username_col;
160         query_vals[n_query_cols].type = DB_STR;
161         query_vals[n_query_cols].nul = 0;
162         query_vals[n_query_cols].val.str_val = subs->from_user;
163         n_query_cols++;
164         
165         query_cols[n_query_cols] = &str_watcher_domain_col;
166         query_vals[n_query_cols].type = DB_STR;
167         query_vals[n_query_cols].nul = 0;
168         query_vals[n_query_cols].val.str_val = subs->from_domain;
169         n_query_cols++;
170
171         query_cols[n_query_cols] = &str_event_col;
172         query_vals[n_query_cols].type = DB_STR;
173         query_vals[n_query_cols].nul = 0;
174         query_vals[n_query_cols].val.str_val = subs->event->name;
175         n_query_cols++;
176
177         if(subs->event_id.s)
178         {
179                 query_cols[n_query_cols] = &str_event_id_col;
180                 query_vals[n_query_cols].type = DB_STR;
181                 query_vals[n_query_cols].nul = 0;
182                 query_vals[n_query_cols].val.str_val = subs->event_id;
183                 n_query_cols++;
184         }
185         query_cols[n_query_cols] = &str_callid_col;
186         query_vals[n_query_cols].type = DB_STR;
187         query_vals[n_query_cols].nul = 0;
188         query_vals[n_query_cols].val.str_val = subs->callid;
189         n_query_cols++;
190
191         query_cols[n_query_cols] = &str_to_tag_col;
192         query_vals[n_query_cols].type = DB_STR;
193         query_vals[n_query_cols].nul = 0;
194         query_vals[n_query_cols].val.str_val = subs->to_tag;
195         n_query_cols++;
196
197         query_cols[n_query_cols] = &str_from_tag_col;
198         query_vals[n_query_cols].type = DB_STR;
199         query_vals[n_query_cols].nul = 0;
200         query_vals[n_query_cols].val.str_val = subs->from_tag;
201         n_query_cols++;
202
203         if(type & REMOTE_TYPE)
204         {
205                 update_keys[n_update_cols] = &str_expires_col;
206                 update_vals[n_update_cols].type = DB_INT;
207                 update_vals[n_update_cols].nul = 0;
208                 update_vals[n_update_cols].val.int_val = subs->expires + (int)time(NULL);
209                 n_update_cols++;
210         
211                 update_keys[n_update_cols] = &str_remote_cseq_col;
212                 update_vals[n_update_cols].type = DB_INT;
213                 update_vals[n_update_cols].nul = 0;
214                 update_vals[n_update_cols].val.int_val = subs->remote_cseq; 
215                 n_update_cols++;
216         }
217         else
218         {       
219                 update_keys[n_update_cols] = &str_local_cseq_col;
220                 update_vals[n_update_cols].type = DB_INT;
221                 update_vals[n_update_cols].nul = 0;
222                 update_vals[n_update_cols].val.int_val = subs->local_cseq+ 1;
223                 n_update_cols++;
224         
225                 update_keys[n_update_cols] = &str_version_col;
226                 update_vals[n_update_cols].type = DB_INT;
227                 update_vals[n_update_cols].nul = 0;
228                 update_vals[n_update_cols].val.int_val = subs->version+ 1; 
229                 n_update_cols++;
230         }
231
232         update_keys[n_update_cols] = &str_status_col;
233         update_vals[n_update_cols].type = DB_INT;
234         update_vals[n_update_cols].nul = 0;
235         update_vals[n_update_cols].val.int_val = subs->status;
236         n_update_cols++;
237
238         update_keys[n_update_cols] = &str_reason_col;
239         update_vals[n_update_cols].type = DB_STR;
240         update_vals[n_update_cols].nul = 0;
241         update_vals[n_update_cols].val.str_val = subs->reason;
242         n_update_cols++;
243         
244         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
245         {
246                 LM_ERR("in use table sql operation\n"); 
247                 return -1;
248         }
249                 
250         if( pa_dbf.update( pa_db,query_cols, 0, query_vals,
251                                 update_keys, update_vals, n_query_cols,n_update_cols)<0) 
252         {
253                 LM_ERR("updating presence information\n");
254                 return -1;
255         }
256         return 0;
257 }
258
259 int update_subscription(struct sip_msg* msg, subs_t* subs, int to_tag_gen,
260                 int* sent_reply)
261 {       
262         unsigned int hash_code;
263         
264         printf_subs(subs);      
265         
266         *sent_reply= 0;
267
268         hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
269
270         if( to_tag_gen ==0) /*if a SUBSCRIBE within a dialog */
271         {
272                 if(subs->expires == 0)
273                 {
274                         LM_DBG("expires =0 -> deleting record\n");
275                 
276                         if( delete_db_subs(subs->pres_uri, 
277                                                 subs->event->name, subs->to_tag)< 0)
278                         {
279                                 LM_ERR("deleting subscription record from database\n");
280                                 goto error;
281                         }
282                         /* delete record from hash table also */
283
284                         subs->local_cseq= delete_shtable(subs_htable,hash_code,
285                                         subs->to_tag);
286                 
287                         if(subs->event->type & PUBL_TYPE)
288                         {       
289                                 if( send_2XX_reply(msg, 202, subs->expires, &subs->to_tag,
290                                                         &subs->local_contact) <0)
291                                 {
292                                         LM_ERR("sending 202 OK\n");
293                                         goto error;
294                                 }
295                                 *sent_reply= 1;
296                                 if(subs->event->wipeer)
297                                 {
298                                         if(query_db_notify(&subs->pres_uri,
299                                                                 subs->event->wipeer, NULL)< 0)
300                                         {
301                                                 LM_ERR("Could not send notify for winfo\n");
302                                                 goto error;
303                                         }
304                                 }
305
306                         }       
307                         else /* if unsubscribe for winfo */
308                         {
309                                 if( send_2XX_reply(msg, 200, subs->expires, &subs->to_tag,
310                                                         &subs->local_contact) <0)
311                                 {
312                                         LM_ERR("sending 200 OK reply\n");
313                                         goto error;
314                                 }
315                                 *sent_reply= 1;
316                         }
317                 
318                         if(notify(subs, NULL, NULL, 0)< 0)
319                         {
320                                 LM_ERR("Could not send notify\n");
321                                 goto error;
322                         }
323                         return 1;
324                 }
325
326                 if(update_shtable(subs_htable, hash_code, subs, REMOTE_TYPE)< 0)
327                 {
328                         if(fallback2db)
329                         {
330                                 /* update in database table */
331                                 if(update_subs_db(subs, REMOTE_TYPE)< 0)
332                                 {
333                                         LM_ERR("updating subscription in database table\n");
334                                         goto error;
335                                 }
336                         }
337                         else
338                         {
339                                 LM_ERR("updating subscription record in hash table\n");
340                                 goto error;
341                         }
342                 }
343         }
344         else
345         {
346                 if(subs->expires!= 0)
347                 {       
348                         if(insert_shtable(subs_htable,hash_code,subs)< 0)
349                         {
350                                 LM_ERR("inserting new record in subs_htable\n");
351                                 goto error;
352                         }
353                 }
354                 /*otherwise there is a subscription outside a dialog with expires= 0 
355                  * no update in database, but should try to send Notify */
356         }
357
358 /* reply_and_notify  */
359
360         if(subs->event->type & PUBL_TYPE)
361         {       
362                 if(send_2XX_reply(msg, 202, subs->expires,&subs->to_tag,
363                                         &subs->local_contact)<0)
364                 {
365                         LM_ERR("sending 202 OK reply\n");
366                         goto error;
367                 }
368                 *sent_reply= 1;
369                 
370                 if(subs->expires!= 0 && subs->event->wipeer)
371                 {       
372                         LM_DBG("send Notify with winfo\n");
373                         if(query_db_notify(&subs->pres_uri, subs->event->wipeer, subs)< 0)
374                         {
375                                 LM_ERR("Could not send notify winfo\n");
376                                 goto error;
377                         }       
378                         if(subs->send_on_cback== 0)
379                         {       
380                                 if(notify(subs, NULL, NULL, 0)< 0)
381                                 {
382                                         LM_ERR("Could not send notify\n");
383                                         goto error;
384                                 }
385                         }
386                 }
387                 else
388                 {
389                         if(notify(subs, NULL, NULL, 0)< 0)
390                         {
391                                 LM_ERR("Could not send notify\n");
392                                 goto error;
393                         }
394                 }       
395                         
396         }
397         else 
398         {
399                 if( send_2XX_reply(msg, 200, subs->expires, &subs->to_tag,
400                                         &subs->local_contact)<0)
401                 {
402                         LM_ERR("sending 200 OK reply\n");
403                         goto error;
404                 }               
405                 *sent_reply= 1;
406                 
407                 if(notify(subs, NULL, NULL, 0 )< 0)
408                 {
409                         LM_ERR("sending notify request\n");
410                         goto error;
411                 }
412         }
413         return 0;
414         
415 error:
416
417         LM_ERR("occured\n");
418         return -1;
419
420 }
421
422 void msg_watchers_clean(unsigned int ticks,void *param)
423 {
424         db_key_t db_keys[3], result_cols[1];
425         db_val_t db_vals[3];
426         db_op_t  db_ops[3] ;
427         db_res_t *result= NULL;
428
429         LM_DBG("cleaning pending subscriptions\n");
430         
431         db_keys[0] = &str_inserted_time_col;
432         db_ops[0] = OP_LT;
433         db_vals[0].type = DB_INT;
434         db_vals[0].nul = 0;
435         db_vals[0].val.int_val = (int)time(NULL)- 24*3600 ;
436
437         db_keys[1] = &str_status_col;
438         db_ops [1] = OP_EQ;
439         db_vals[1].type = DB_INT;
440         db_vals[1].nul = 0;
441         db_vals[1].val.int_val = PENDING_STATUS;
442         
443         result_cols[0]= &str_id_col;
444
445         if (pa_dbf.use_table(pa_db, &watchers_table) < 0) 
446         {
447                 LM_ERR("unsuccessful use table sql operation\n");
448                 return ;
449         }
450         
451         if(pa_dbf.query(pa_db, db_keys, db_ops, db_vals, result_cols, 2, 1, 0, &result )< 0)
452         {
453                 LM_ERR("querying database for expired messages\n");
454                 if(result)
455                         pa_dbf.free_result(pa_db, result);
456                 return;
457         }
458         if(result == NULL)
459                 return;
460         if(result->n <= 0)
461         {
462                 pa_dbf.free_result(pa_db, result);
463                 return;
464         }
465         pa_dbf.free_result(pa_db, result);
466
467         if (pa_dbf.delete(pa_db, db_keys, db_ops, db_vals, 2) < 0) 
468                 LM_ERR("cleaning pending subscriptions\n");
469 }
470
471 int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
472 {
473         int  to_tag_gen = 0;
474         subs_t subs;
475         pres_ev_t* event= NULL;
476         event_t* parsed_event= NULL;
477         param_t* ev_param= NULL;
478         int found;
479         str reason= {0, 0};
480         struct sip_uri uri;
481         int reply_code;
482         str reply_str;
483         int sent_reply= 0;
484
485         /* ??? rename to avoid collisions with other symbols */
486         counter++;
487
488         memset(&subs, 0, sizeof(subs_t));
489         
490         reply_code= 500;
491         reply_str= pu_500_rpl;
492
493         if( parse_headers(msg,HDR_EOH_F, 0)==-1 )
494         {
495                 LM_ERR("parsing headers\n");
496                 reply_code= 400;
497                 reply_str= pu_400_rpl;
498                 goto error;
499         }
500         
501         /* inspecting the Event header field */
502         if(msg->event && msg->event->body.len > 0)
503         {
504                 if (!msg->event->parsed && (parse_event(msg->event) < 0))
505                 {
506                         reply_code= 400;
507                         reply_str= pu_400_rpl;
508                         goto error;
509                 }
510                 if(((event_t*)msg->event->parsed)->parsed & EVENT_OTHER)
511                 {       
512                         goto bad_event;
513                 }
514         }
515         else
516                 goto bad_event;
517
518         /* search event in the list */
519         parsed_event= (event_t*)msg->event->parsed;
520         event= search_event(parsed_event);
521         if(event== NULL)
522         {
523                 goto bad_event;
524         }
525         subs.event= event;
526         
527         /* extract the id if any*/
528         ev_param= parsed_event->params;
529         while(ev_param)
530         {
531                 if(ev_param->name.len== 2 && strncmp(ev_param->name.s, "id", 2)== 0)
532                 {
533                         subs.event_id= ev_param->body;
534                         break;
535                 }
536                 ev_param= ev_param->next;
537         }               
538         
539         if(extract_sdialog_info(&subs, msg, max_expires, &to_tag_gen)< 0)
540         {
541                 LM_ERR("failed to extract dialog information\n");
542                 goto error;
543         }
544
545         /* getting presentity uri from Request-URI if initial subscribe - or else from database*/
546         if(to_tag_gen)
547         {
548                 if(parsed_event->parsed!= EVENT_DIALOG)
549                 {
550                         if( parse_sip_msg_uri(msg)< 0)
551                         {
552                                 LM_ERR("failed to parse R-URI\n");
553                                 return -1;
554                         }
555                         if(uandd_to_uri(msg->parsed_uri.user, msg->parsed_uri.host,
556                                         &subs.pres_uri)< 0)
557                         {
558                                 LM_ERR("failed to construct uri from user and domain\n");
559                                 goto error;
560                         }
561                 }
562         }
563         else
564         {
565                 if(get_stored_info(msg, &subs, &reply_code, &reply_str )< 0)
566                 {
567                         LM_ERR("getting stored info\n");
568                         goto error;
569                 }
570                 reason= subs.reason;
571         }       
572
573         /* call event specific subscription handling */
574         if(event->evs_subs_handl)
575         {
576                 if(event->evs_subs_handl(msg)< 0)
577                 {
578                         LM_ERR("in event specific subscription handling\n");
579                         goto error;
580                 }
581         }       
582
583
584         /* if dialog initiation Subscribe - get subscription state */
585         if(to_tag_gen)
586         {
587                 if(!event->req_auth) 
588                         subs.status = ACTIVE_STATUS;
589                 else   
590                 {
591                         /* query in watchers_table */
592                         if(get_db_subs_auth(&subs, &found)< 0)
593                         {
594                                 LM_ERR("getting subscription status from watchers table\n");
595                                 goto error;
596                         }
597                         if(found== 0)
598                         {
599                                 /*default 'pending' status */
600                                 subs.status= PENDING_STATUS;
601                                 subs.reason.s= NULL;
602                                 subs.reason.len= 0;
603                                 /* here a query to xcap server must be done -> new process maybe */
604                         
605                                 if(parse_uri(subs.pres_uri.s, subs.pres_uri.len, &uri)< 0)
606                                 {
607                                         LM_ERR("parsing uri\n");
608                                         goto error;
609
610                                 }
611                                 if(subs.event->get_rules_doc(&uri.user, &uri.host, &subs.auth_rules_doc)< 0)
612                                 {
613                                         LM_ERR("getting rules doc\n");
614                                         goto error;
615                                 }
616                                 
617                                 if(subs.event->get_auth_status(&subs)< 0)
618                                 {
619                                         LM_ERR("in event specific function is_watcher_allowed\n");
620                                         goto error;
621                                 }
622                                 if(get_status_str(subs.status)== NULL)
623                                 {
624                                         LM_ERR("wrong status= %d\n", subs.status);
625                                         goto error;
626                                 }
627
628                                 if(insert_db_subs_auth(&subs)< 0)
629                                 {
630                                         LM_ERR("while inserting record in watchers table\n");
631                                         goto error;
632                                 }
633                         }
634                         else
635                         {
636                                 reason= subs.reason;
637                         }
638                 }
639         }
640
641         /* check if correct status */
642         if(get_status_str(subs.status)== NULL)
643         {
644                 LM_ERR("wrong status\n");
645                 goto error;
646         }
647     LM_DBG("subscription status= %s - %s\n", get_status_str(subs.status), 
648             found==0?"inserated":"found in watcher table");
649         
650         if(update_subscription(msg, &subs, to_tag_gen, &sent_reply) <0)
651         {       
652                 LM_ERR("in update_subscription\n");
653                 goto error;
654         }
655         if(subs.auth_rules_doc)
656         {
657                 pkg_free(subs.auth_rules_doc->s);
658                 pkg_free(subs.auth_rules_doc);
659         }
660         if(reason.s)
661                 pkg_free(reason.s);
662         
663         if(parsed_event->parsed!= EVENT_DIALOG && subs.pres_uri.s)
664                 pkg_free(subs.pres_uri.s);
665         
666         if((!server_address.s) || (server_address.len== 0))
667         {
668                 pkg_free(subs.local_contact.s);
669         }
670         if(subs.record_route.s)
671                 pkg_free(subs.record_route.s);
672
673         return 1;
674
675 bad_event:
676
677         LM_ERR("Missing or unsupported event header field value\n");
678                 
679         if(parsed_event)
680                 LM_ERR("\tevent= %.*s\n",parsed_event->text.len,parsed_event->text.s);
681         
682         reply_code= BAD_EVENT_CODE;
683         reply_str= pu_489_rpl;
684
685 error:
686         
687         if(sent_reply== 0)
688         {
689                 if(send_error_reply(msg, reply_code, reply_str)< 0)
690                 {
691                         LM_ERR("failed to send reply on error case\n");
692                 }
693         }
694
695         if(parsed_event->parsed!= EVENT_DIALOG &&subs.pres_uri.s)
696                 pkg_free(subs.pres_uri.s);
697         
698         if(subs.auth_rules_doc)
699         {
700                 if(subs.auth_rules_doc->s)
701                         pkg_free(subs.auth_rules_doc->s);
702                 pkg_free(subs.auth_rules_doc);
703         }
704         if(reason.s)
705                 pkg_free(reason.s);
706
707         if(((!server_address.s) ||(server_address.len== 0))&& subs.local_contact.s)
708         {
709                 pkg_free(subs.local_contact.s);
710         }
711         if(subs.record_route.s)
712                 pkg_free(subs.record_route.s);
713
714         return -1;
715
716 }
717
718
719 int extract_sdialog_info(subs_t* subs,struct sip_msg* msg, int mexp, int* to_tag_gen)
720 {
721         static char buf[50];
722         str rec_route= {0, 0};
723         int rt  = 0;
724         str* contact= NULL;
725         contact_body_t *b;
726         struct to_body *pto, *pfrom = NULL, TO;
727         int lexpire;
728         str rtag_value;
729         struct sip_uri uri;
730
731         /* examine the expire header field */
732         if(msg->expires && msg->expires->body.len > 0)
733         {
734                 if (!msg->expires->parsed && (parse_expires(msg->expires) < 0))
735                 {
736                         LM_ERR("cannot parse Expires header\n");
737                         goto error;
738                 }
739                 lexpire = ((exp_body_t*)msg->expires->parsed)->val;
740                 LM_DBG("'Expires' header found, value= %d\n", lexpire);
741
742         }
743         else 
744         {
745                 LM_DBG("'expires' not found; default=%d\n",subs->event->default_expires);
746                 lexpire = subs->event->default_expires;
747         }
748         if(lexpire > mexp)
749                 lexpire = mexp;
750
751         subs->expires = lexpire;
752
753         if( msg->to==NULL || msg->to->body.s==NULL)
754         {
755                 LM_ERR("cannot parse TO header\n");
756                 goto error;
757         }
758         /* examine the to header */
759         if(msg->to->parsed != NULL)
760         {
761                 pto = (struct to_body*)msg->to->parsed;
762                 LM_DBG("'To' header ALREADY PARSED: <%.*s>\n",pto->uri.len,pto->uri.s);
763         }
764         else
765         {
766                 memset( &TO , 0, sizeof(TO) );
767                 if( !parse_to(msg->to->body.s,msg->to->body.s + msg->to->body.len + 1, &TO));
768                 {
769                         LM_DBG("'To' header NOT parsed\n");
770                         goto error;
771                 }
772                 pto = &TO;
773         }
774
775         if( pto->parsed_uri.user.s && pto->parsed_uri.host.s &&
776                 pto->parsed_uri.user.len && pto->parsed_uri.host.len)
777         {
778                 subs->to_user = pto->parsed_uri.user;
779                 subs->to_domain = pto->parsed_uri.host;
780         }
781         else
782         {
783                 if(parse_uri(pto->uri.s, pto->uri.len, &uri)< 0)
784                 {
785                         LM_ERR("while parsing uri\n");
786                         goto error;
787                 }
788                 subs->to_user = uri.user;
789                 subs->to_domain = uri.host;
790         }
791
792         /* examine the from header */
793         if (!msg->from || !msg->from->body.s)
794         {
795                 LM_DBG("cannot find 'from' header!\n");
796                 goto error;
797         }
798         if (msg->from->parsed == NULL)
799         {
800                 LM_DBG("'From' header not parsed\n");
801                 /* parsing from header */
802                 if ( parse_from_header( msg )<0 ) 
803                 {
804                         LM_DBG("cannot parse From header\n");
805                         goto error;
806                 }
807         }
808         pfrom = (struct to_body*)msg->from->parsed;
809         
810         if( pfrom->parsed_uri.user.s && pfrom->parsed_uri.host.s && 
811                 pfrom->parsed_uri.user.len && pfrom->parsed_uri.host.len)
812         {
813                 subs->from_user = pfrom->parsed_uri.user;
814                 subs->from_domain = pfrom->parsed_uri.host;
815         }
816         else
817         {
818                 if(parse_uri(pfrom->uri.s, pfrom->uri.len, &uri)< 0)
819                 {
820                         LM_ERR("while parsing uri\n");
821                         goto error;
822                 }
823                 subs->from_user = uri.user;
824                 subs->from_domain = uri.host;
825         }
826
827         if(subs->event->evp->parsed== EVENT_DIALOG)
828         {
829                 subs->pres_uri= pfrom->uri;
830         }
831
832         /*generate to_tag if the message does not have a to_tag*/
833         if (pto->tag_value.s==NULL || pto->tag_value.len==0 )
834         {  
835                 LM_DBG("generating to_tag\n");
836                 *to_tag_gen = 1;
837                 /*generate to_tag then insert it in avp*/
838                 
839                 rtag_value.s = buf;
840                 rtag_value.len = sprintf(rtag_value.s,"%s.%d.%d.%d", to_tag_pref,
841                                 pid, (int)time(NULL), counter);
842                 if(rtag_value.len<= 0)
843                 {
844                         LM_ERR("while creating to_tag\n");
845                         goto error;
846                 }
847         }
848         else
849         {
850                 *to_tag_gen = 0;
851                 rtag_value=pto->tag_value;
852         }
853         subs->to_tag = rtag_value;
854
855         if( msg->callid==NULL || msg->callid->body.s==NULL)
856         {
857                 LM_ERR("cannot parse callid header\n");
858                 goto error;
859         }
860         subs->callid = msg->callid->body;
861
862         if( msg->cseq==NULL || msg->cseq->body.s==NULL)
863         {
864                 LM_ERR("cannot parse cseq header\n");
865                 goto error;
866         }
867         if (str2int( &(get_cseq(msg)->number), &subs->remote_cseq)!=0 )
868         {
869                 LM_ERR("cannot parse cseq number\n");
870                 goto error;
871         }
872         if( msg->contact==NULL || msg->contact->body.s==NULL)
873         {
874                 LM_ERR("cannot parse contact header\n");
875                 goto error;
876         }
877         if( parse_contact(msg->contact) <0 )
878         {
879                 LM_ERR(" cannot parse contact"
880                                 " header\n");
881                 goto error;
882         }
883         b= (contact_body_t* )msg->contact->parsed;
884
885         if(b == NULL)
886         {
887                 LM_ERR("cannot parse contact header\n");
888                 goto error;
889         }
890         subs->contact = b->contacts->uri;
891         
892         LM_DBG("subs->contact= %.*s - len = %d\n",subs->contact.len,
893                         subs->contact.s, subs->contact.len);    
894
895         /*process record route and add it to a string*/
896         if(*to_tag_gen && msg->record_route!=NULL)
897         {
898                 rt = print_rr_body(msg->record_route, &rec_route, 0, 0);
899                 if(rt != 0)
900                 {
901                         LM_ERR("processing the record route [%d]\n", rt);       
902                         rec_route.s=NULL;
903                         rec_route.len=0;
904                 //      goto error;
905                 }
906         }
907         subs->record_route = rec_route;
908                         
909         subs->sockinfo_str= msg->rcv.bind_address->sock_str;
910
911         if( pfrom->tag_value.s ==NULL || pfrom->tag_value.len == 0)
912         {
913                 LM_ERR("no from tag value present\n");
914                 goto error;
915         }
916         subs->from_tag = pfrom->tag_value;
917
918         subs->version = 0;
919         
920         if((!server_address.s) || (server_address.len== 0))
921         {
922                 contact= get_local_contact(msg);
923                 if(contact== NULL)
924                 {
925                         LM_ERR("in function get_local_contact\n");
926                         goto error;
927                 }
928                 subs->local_contact= *contact;
929         }
930         else
931                 subs->local_contact= server_address;
932         
933         return 0;
934         
935 error:
936
937         return -1;
938 }
939
940
941 int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code,
942                 str* reply_str)
943 {       
944         str pres_uri= {0, 0}, reason={0, 0};
945         subs_t* s;
946         int i;
947         unsigned int hash_code;
948
949         /* first try to_user== pres_user and to_domain== pres_domain */
950
951         uandd_to_uri(subs->to_user, subs->to_domain, &pres_uri);
952         if(pres_uri.s== NULL)
953         {
954                 LM_ERR("creating uri from user and domain\n");
955                 return -1;
956         }
957         hash_code= core_hash(&pres_uri, &subs->event->name, shtable_size);
958         lock_get(&subs_htable[hash_code].lock);
959         i= hash_code;
960         s= search_shtable(subs_htable, subs->callid, subs->to_tag,
961                         subs->from_tag, hash_code);
962         if(s)
963         {
964                 goto found_rec;
965         }
966         lock_release(&subs_htable[hash_code].lock);
967
968         pkg_free(pres_uri.s);
969         pres_uri.s= NULL;
970         LM_DBG("record not found using R-URI search iteratively\n");
971         /* take one row at a time */
972         for(i= 0; i< shtable_size; i++)
973         {
974                 lock_get(&subs_htable[i].lock);
975                 s= search_shtable(subs_htable, subs->callid,subs->to_tag,subs->from_tag, i);
976                 if(s && s->event->evp->parsed!= EVENT_DIALOG)
977                 {
978                         pres_uri.s= (char*)pkg_malloc(s->pres_uri.len* sizeof(char));
979                         if(pres_uri.s== NULL)
980                         {
981                                 lock_release(&subs_htable[i].lock);
982                                 ERR_MEM(PKG_MEM_STR);
983                         }
984                         memcpy(pres_uri.s, s->pres_uri.s, s->pres_uri.len);
985                         pres_uri.len= s->pres_uri.len;
986                         goto found_rec;
987                 }
988                 lock_release(&subs_htable[i].lock);
989         }
990
991         if(fallback2db)
992         {
993                 return get_database_info(msg, subs, reply_code, reply_str);     
994         }
995
996         LM_ERR("record not found in hash_table\n");
997         *reply_code= 481;
998         *reply_str= pu_481_rpl;
999
1000         return -1;
1001
1002 found_rec:
1003         
1004         LM_DBG("Record found in hash_table\n");
1005         
1006         if(s->event->evp->parsed!= EVENT_DIALOG)
1007                 subs->pres_uri= pres_uri;
1008         
1009         subs->status= s->status;
1010         if(s->reason.s && s->reason.len)
1011         {       
1012                 reason.s= (char*)pkg_malloc(s->reason.len* sizeof(char));
1013                 if(reason.s== NULL)
1014                 {
1015                         lock_release(&subs_htable[i].lock);
1016                         ERR_MEM(PKG_MEM_STR);
1017                 }
1018                 memcpy(reason.s, s->reason.s, s->reason.len);
1019                 reason.len= s->reason.len;
1020                 subs->reason= reason;
1021         }
1022         if(s->record_route.s && s->record_route.len)
1023         {
1024                 subs->record_route.s= (char*)pkg_malloc
1025                         (s->record_route.len* sizeof(char));
1026                 if(subs->record_route.s== NULL)
1027                 {
1028                         ERR_MEM(PKG_MEM_STR);
1029                 }
1030                 memcpy(subs->record_route.s, s->record_route.s, s->record_route.len);
1031                 subs->record_route.len= s->record_route.len;
1032         }
1033
1034         subs->local_cseq= s->local_cseq;
1035         
1036         if(subs->remote_cseq<= s->remote_cseq)
1037         {
1038                 LM_ERR("wrong sequence number;received: %d - stored: %d\n",
1039                                 subs->remote_cseq, s->remote_cseq);
1040                 
1041                 *reply_code= 400;
1042                 *reply_str= pu_400_rpl;
1043
1044                 lock_release(&subs_htable[i].lock);
1045                 goto error;
1046         }       
1047         lock_release(&subs_htable[i].lock);
1048
1049         return 0;
1050
1051 error:
1052         if(subs->reason.s)
1053                 pkg_free(subs->reason.s);
1054         subs->reason.s= NULL;
1055         if(subs->record_route.s)
1056                 pkg_free(subs->record_route.s);
1057         subs->record_route.s= NULL;
1058         return -1;
1059 }
1060
1061 int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* reply_str)
1062 {       
1063         db_key_t query_cols[10];
1064         db_val_t query_vals[10];
1065         db_key_t result_cols[8];
1066         db_res_t *result= NULL;
1067         db_row_t *row ; 
1068         db_val_t *row_vals ;
1069         int n_query_cols = 0;
1070         int n_result_cols = 0;
1071         int remote_cseq_col= 0, local_cseq_col= 0, status_col, reason_col;
1072         int record_route_col;
1073         int pres_uri_col;
1074         unsigned int remote_cseq;
1075         str pres_uri, record_route;
1076         str reason;
1077
1078         query_cols[n_query_cols] = &str_to_user_col;
1079         query_vals[n_query_cols].type = DB_STR;
1080         query_vals[n_query_cols].nul = 0;
1081         query_vals[n_query_cols].val.str_val = subs->to_user;
1082         n_query_cols++;
1083         
1084         query_cols[n_query_cols] = &str_to_domain_col;
1085         query_vals[n_query_cols].type = DB_STR;
1086         query_vals[n_query_cols].nul = 0;
1087         query_vals[n_query_cols].val.str_val = subs->to_domain;
1088         n_query_cols++;
1089
1090         query_cols[n_query_cols] = &str_watcher_username_col;
1091         query_vals[n_query_cols].type = DB_STR;
1092         query_vals[n_query_cols].nul = 0;
1093         query_vals[n_query_cols].val.str_val = subs->from_user;
1094         n_query_cols++;
1095         
1096         query_cols[n_query_cols] = &str_watcher_domain_col;
1097         query_vals[n_query_cols].type = DB_STR;
1098         query_vals[n_query_cols].nul = 0;
1099         query_vals[n_query_cols].val.str_val = subs->from_domain;
1100         n_query_cols++;
1101
1102         query_cols[n_query_cols] = &str_event_col;
1103         query_vals[n_query_cols].type = DB_STR;
1104         query_vals[n_query_cols].nul = 0;
1105         query_vals[n_query_cols].val.str_val = subs->event->name;
1106         n_query_cols++;
1107
1108         query_cols[n_query_cols] = &str_event_id_col;
1109         query_vals[n_query_cols].type = DB_STR;
1110         query_vals[n_query_cols].nul = 0;
1111         if( subs->event_id.s != NULL)
1112         {
1113                 query_vals[n_query_cols].val.str_val.s = subs->event_id.s;
1114                 query_vals[n_query_cols].val.str_val.len = subs->event_id.len;
1115         } else {
1116                 query_vals[n_query_cols].val.str_val.s = "";
1117                 query_vals[n_query_cols].val.str_val.len = 0;
1118         }
1119         n_query_cols++;
1120         
1121         query_cols[n_query_cols] = &str_callid_col;
1122         query_vals[n_query_cols].type = DB_STR;
1123         query_vals[n_query_cols].nul = 0;
1124         query_vals[n_query_cols].val.str_val = subs->callid;
1125         n_query_cols++;
1126
1127         query_cols[n_query_cols] = &str_to_tag_col;
1128         query_vals[n_query_cols].type = DB_STR;
1129         query_vals[n_query_cols].nul = 0;
1130         query_vals[n_query_cols].val.str_val = subs->to_tag;
1131         n_query_cols++;
1132
1133         query_cols[n_query_cols] = &str_from_tag_col;
1134         query_vals[n_query_cols].type = DB_STR;
1135         query_vals[n_query_cols].nul = 0;
1136         query_vals[n_query_cols].val.str_val = subs->from_tag;
1137         n_query_cols++;
1138
1139         result_cols[pres_uri_col=n_result_cols++] = &str_presentity_uri_col;
1140         result_cols[remote_cseq_col=n_result_cols++] = &str_remote_cseq_col;
1141         result_cols[local_cseq_col=n_result_cols++] = &str_local_cseq_col;
1142         result_cols[status_col=n_result_cols++] = &str_status_col;
1143         result_cols[reason_col=n_result_cols++] = &str_reason_col;
1144         result_cols[record_route_col=n_result_cols++] = &str_record_route_col;
1145         
1146         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
1147         {
1148                 LM_ERR("unsuccessful use_table sql operation\n");
1149                 return -1;
1150         }
1151         
1152         if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
1153                  result_cols, n_query_cols, n_result_cols, 0,  &result) < 0) 
1154         {
1155                 LM_ERR("querying subscription dialog\n");
1156                 if(result)
1157                         pa_dbf.free_result(pa_db, result);
1158                 return -1;
1159         }
1160         if(result== NULL)
1161                 return -1;
1162
1163         if(result && result->n <=0)
1164         {
1165                 LM_ERR("No matching subscription dialog found in database\n");
1166                 
1167                 pa_dbf.free_result(pa_db, result);
1168                 *reply_code= 481;
1169                 *reply_str= pu_481_rpl;
1170
1171                 return -1;
1172         }
1173
1174         row = &result->rows[0];
1175         row_vals = ROW_VALUES(row);
1176         remote_cseq= row_vals[remote_cseq_col].val.int_val;
1177         
1178         if(subs->remote_cseq<= remote_cseq)
1179         {
1180                 LM_ERR("wrong sequence number received: %d - stored: %d\n",
1181                                 subs->remote_cseq, remote_cseq);
1182                 *reply_code= 400;
1183                 *reply_str= pu_400_rpl;
1184                 pa_dbf.free_result(pa_db, result);
1185                 return -1;
1186         }
1187         
1188         subs->status= row_vals[status_col].val.int_val;
1189         reason.s= (char*)row_vals[reason_col].val.string_val;
1190         if(reason.s)
1191         {
1192                 reason.len= strlen(reason.s);
1193                 subs->reason.s= (char*)pkg_malloc(reason.len* sizeof(char));
1194                 if(subs->reason.s== NULL)
1195                 {
1196                         ERR_MEM(PKG_MEM_STR);
1197                 }
1198                 memcpy(subs->reason.s, reason.s, reason.len);
1199                 subs->reason.len= reason.len;
1200         }
1201
1202         subs->local_cseq= row_vals[local_cseq_col].val.int_val;
1203         
1204         if(subs->event->evp->parsed!= EVENT_DIALOG)
1205         {
1206                 pres_uri.s= (char*)row_vals[pres_uri_col].val.string_val;
1207                 pres_uri.len= strlen(pres_uri.s);
1208                 subs->pres_uri.s= (char*)pkg_malloc(pres_uri.len* sizeof(char));
1209                 if(subs->pres_uri.s== NULL)
1210                 {       
1211                         if(subs->reason.s)
1212                                 pkg_free(subs->reason.s);
1213                         ERR_MEM(PKG_MEM_STR);
1214                 }
1215                 memcpy(subs->pres_uri.s, pres_uri.s, pres_uri.len);
1216                 subs->pres_uri.len= pres_uri.len;
1217         }
1218
1219         record_route.s= (char*)row_vals[record_route_col].val.string_val;
1220         if(record_route.s)
1221         {
1222                 record_route.len= strlen(record_route.s);
1223                 subs->record_route.s= (char*)pkg_malloc(record_route.len*sizeof(char));
1224                 if(subs->record_route.s== NULL)
1225                 {
1226                         ERR_MEM(PKG_MEM_STR);
1227                 }
1228                 memcpy(subs->record_route.s, record_route.s, record_route.len);
1229                 subs->record_route.len= record_route.len;
1230         }
1231
1232         pa_dbf.free_result(pa_db, result);
1233         result= NULL;
1234
1235         return 0;
1236 error:
1237         if(result)
1238                 pa_dbf.free_result(pa_db, result);
1239
1240         return -1;
1241
1242 }
1243
1244
1245 int handle_expired_subs(subs_t* s)
1246 {
1247         /* send Notify with state=terminated;reason=timeout */
1248         
1249         s->status= TERMINATED_STATUS;
1250         s->reason.s= "timeout";
1251         s->reason.len= 7;
1252         s->expires= 0;
1253
1254         if(send_notify_request(s, NULL, NULL, 1)< 0)
1255         {
1256                 LM_ERR("send Notify not successful\n");
1257                 return -1;
1258         }
1259         
1260         return 0;
1261
1262 }
1263
1264 void timer_db_update(unsigned int ticks,void *param)
1265 {       
1266         int no_lock=0;
1267
1268         if(ticks== 0 && param == NULL)
1269                 no_lock= 1;
1270         
1271         if(pa_dbf.use_table(pa_db, &active_watchers_table)< 0)
1272         {
1273                 LM_ERR("sql use table failed\n");
1274                 return;
1275         }
1276
1277         update_db_subs(pa_db, pa_dbf, subs_htable, 
1278                         shtable_size, no_lock, handle_expired_subs);
1279
1280 }
1281
1282 void update_db_subs(db_con_t *db,db_func_t dbf, shtable_t hash_table,
1283         int htable_size, int no_lock, handle_expired_func_t handle_expired_func)
1284 {       
1285         db_key_t query_cols[22], update_cols[7];
1286         db_val_t query_vals[22], update_vals[7];
1287         db_op_t update_ops[1];
1288         subs_t* del_s;
1289         int pres_uri_col, to_user_col, to_domain_col, from_user_col, from_domain_col,
1290                 callid_col, totag_col, fromtag_col, event_col,status_col, event_id_col, 
1291                 local_cseq_col, remote_cseq_col, expires_col, record_route_col, 
1292                 contact_col, local_contact_col, version_col,socket_info_col,reason_col;
1293         int u_expires_col, u_local_cseq_col, u_remote_cseq_col, u_version_col, 
1294                 u_reason_col, u_status_col; 
1295         int i;
1296         subs_t* s= NULL, *prev_s= NULL;
1297         int n_query_cols= 0, n_update_cols= 0;
1298         int n_query_update;
1299
1300         query_cols[pres_uri_col= n_query_cols] =&str_presentity_uri_col;
1301         query_vals[pres_uri_col].type = DB_STR;
1302         query_vals[pres_uri_col].nul = 0;
1303         n_query_cols++;
1304         
1305         query_cols[callid_col= n_query_cols] =&str_callid_col;
1306         query_vals[callid_col].type = DB_STR;
1307         query_vals[callid_col].nul = 0;
1308         n_query_cols++;
1309
1310         query_cols[totag_col= n_query_cols] =&str_to_tag_col;
1311         query_vals[totag_col].type = DB_STR;
1312         query_vals[totag_col].nul = 0;
1313         n_query_cols++;
1314
1315         query_cols[fromtag_col= n_query_cols] =&str_from_tag_col;
1316         query_vals[fromtag_col].type = DB_STR;
1317         query_vals[fromtag_col].nul = 0;
1318         n_query_cols++;
1319
1320         n_query_update= n_query_cols;
1321
1322         query_cols[to_user_col= n_query_cols] =&str_to_user_col;
1323         query_vals[to_user_col].type = DB_STR;
1324         query_vals[to_user_col].nul = 0;
1325         n_query_cols++;
1326
1327         query_cols[to_domain_col= n_query_cols] =&str_to_domain_col;
1328         query_vals[to_domain_col].type = DB_STR;
1329         query_vals[to_domain_col].nul = 0;
1330         n_query_cols++;
1331         
1332         query_cols[from_user_col= n_query_cols] =&str_watcher_username_col;
1333         query_vals[from_user_col].type = DB_STR;
1334         query_vals[from_user_col].nul = 0;
1335         n_query_cols++;
1336
1337         query_cols[from_domain_col= n_query_cols] =&str_watcher_domain_col;
1338         query_vals[from_domain_col].type = DB_STR;
1339         query_vals[from_domain_col].nul = 0;
1340         n_query_cols++;
1341
1342         query_cols[event_col= n_query_cols] =&str_event_col;
1343         query_vals[event_col].type = DB_STR;
1344         query_vals[event_col].nul = 0;
1345         n_query_cols++; 
1346
1347         query_cols[event_id_col= n_query_cols] =&str_event_id_col;
1348         query_vals[event_id_col].type = DB_STR;
1349         query_vals[event_id_col].nul = 0;
1350         n_query_cols++;
1351
1352         query_cols[local_cseq_col= n_query_cols]=&str_local_cseq_col;
1353         query_vals[local_cseq_col].type = DB_INT;
1354         query_vals[local_cseq_col].nul = 0;
1355         n_query_cols++;
1356
1357         query_cols[remote_cseq_col= n_query_cols]=&str_remote_cseq_col;
1358         query_vals[remote_cseq_col].type = DB_INT;
1359         query_vals[remote_cseq_col].nul = 0;
1360         n_query_cols++;
1361
1362         query_cols[expires_col= n_query_cols] =&str_expires_col;
1363         query_vals[expires_col].type = DB_INT;
1364         query_vals[expires_col].nul = 0;
1365         n_query_cols++;
1366
1367         query_cols[status_col= n_query_cols] =&str_status_col;
1368         query_vals[status_col].type = DB_INT;
1369         query_vals[status_col].nul = 0;
1370         n_query_cols++;
1371
1372         query_cols[reason_col= n_query_cols] =&str_reason_col;
1373         query_vals[reason_col].type = DB_STR;
1374         query_vals[reason_col].nul = 0;
1375         n_query_cols++;
1376
1377         query_cols[record_route_col= n_query_cols] =&str_record_route_col;
1378         query_vals[record_route_col].type = DB_STR;
1379         query_vals[record_route_col].nul = 0;
1380         n_query_cols++;
1381         
1382         query_cols[contact_col= n_query_cols] =&str_contact_col;
1383         query_vals[contact_col].type = DB_STR;
1384         query_vals[contact_col].nul = 0;
1385         n_query_cols++;
1386
1387         query_cols[local_contact_col= n_query_cols] =&str_local_contact_col;
1388         query_vals[local_contact_col].type = DB_STR;
1389         query_vals[local_contact_col].nul = 0;
1390         n_query_cols++;
1391
1392         query_cols[socket_info_col= n_query_cols] =&str_socket_info_col;
1393         query_vals[socket_info_col].type = DB_STR;
1394         query_vals[socket_info_col].nul = 0;
1395         n_query_cols++;
1396
1397         query_cols[version_col= n_query_cols]=&str_version_col;
1398         query_vals[version_col].type = DB_INT;
1399         query_vals[version_col].nul = 0;
1400         n_query_cols++;
1401
1402         /* cols and values used for update */
1403         update_cols[u_expires_col= n_update_cols]= &str_expires_col;
1404         update_vals[u_expires_col].type = DB_INT;
1405         update_vals[u_expires_col].nul = 0;
1406         n_update_cols++;
1407
1408         update_cols[u_status_col= n_update_cols]= &str_status_col;
1409         update_vals[u_status_col].type = DB_INT;
1410         update_vals[u_status_col].nul = 0;
1411         n_update_cols++;
1412
1413         update_cols[u_reason_col= n_update_cols]= &str_reason_col;
1414         update_vals[u_reason_col].type = DB_STR;
1415         update_vals[u_reason_col].nul = 0;
1416         n_update_cols++;
1417
1418         update_cols[u_remote_cseq_col= n_update_cols]= &str_remote_cseq_col;
1419         update_vals[u_remote_cseq_col].type = DB_INT;
1420         update_vals[u_remote_cseq_col].nul = 0;
1421         n_update_cols++;
1422
1423         update_cols[u_local_cseq_col= n_update_cols]= &str_local_cseq_col;
1424         update_vals[u_local_cseq_col].type = DB_INT;
1425         update_vals[u_local_cseq_col].nul = 0;
1426         n_update_cols++;
1427         
1428         update_cols[u_version_col= n_update_cols]= &str_version_col;
1429         update_vals[u_version_col].type = DB_INT;
1430         update_vals[u_version_col].nul = 0;
1431         n_update_cols++;
1432
1433
1434         if(db== NULL)
1435         {
1436                 LM_ERR("null database connection\n");
1437                 return;
1438         }
1439         for(i=0; i<htable_size; i++) 
1440         {
1441                 if(!no_lock)
1442                         lock_get(&hash_table[i].lock);  
1443
1444                 prev_s= hash_table[i].entries;
1445                 s= prev_s->next;
1446         
1447                 while(s)
1448                 {
1449                         printf_subs(s);
1450                         if(s->expires < (int)time(NULL)- 50)    
1451                         {
1452                                 LM_DBG("Found expired record\n");
1453                                 if(!no_lock)
1454                                 {
1455                                         if(handle_expired_func(s)< 0)
1456                                         {
1457                                                 LM_ERR("in function handle_expired_record\n");
1458                                                 if(!no_lock)
1459                                                         lock_release(&hash_table[i].lock);      
1460                                                 return ;
1461                                         }
1462                                 }
1463                                 del_s= s;       
1464                                 s= s->next;
1465                                 prev_s->next= s;
1466                                 
1467                                 shm_free(del_s);
1468                                 continue;
1469                         }
1470                         switch(s->db_flag)
1471                         {
1472                                 case NO_UPDATEDB_FLAG:
1473                                 {
1474                                         LM_DBG("NO_UPDATEDB_FLAG\n");
1475                                         break;                    
1476                                 }
1477                                 case UPDATEDB_FLAG:
1478                                 {
1479                                         LM_DBG("UPDATEDB_FLAG\n");
1480
1481                                         query_vals[pres_uri_col].val.str_val= s->pres_uri;
1482                                         query_vals[callid_col].val.str_val= s->callid;
1483                                         query_vals[totag_col].val.str_val= s->to_tag;
1484                                         query_vals[fromtag_col].val.str_val= s->from_tag;
1485                                 
1486                                         update_vals[u_expires_col].val.int_val= s->expires;
1487                                         update_vals[u_local_cseq_col].val.int_val= s->local_cseq;
1488                                         update_vals[u_remote_cseq_col].val.int_val= s->remote_cseq;
1489                                         update_vals[u_version_col].val.int_val= s->version;
1490                                         update_vals[u_status_col].val.int_val= s->status;
1491                                         update_vals[u_reason_col].val.str_val= s->reason;
1492
1493                                         if(dbf.update(db, query_cols, 0, query_vals, update_cols, 
1494                                                                 update_vals, n_query_update, n_update_cols)< 0)
1495                                         {
1496                                                 LM_ERR("updating in database\n");
1497                                                 if(!no_lock)
1498                                                         lock_release(&hash_table[i].lock);      
1499                                                 return ;
1500                                         }
1501                                         break;
1502                                 }
1503                                 case  INSERTDB_FLAG:
1504                                 {
1505                                         LM_DBG("INSERTDB_FLAG\n");
1506
1507                                         query_vals[pres_uri_col].val.str_val= s->pres_uri;
1508                                         query_vals[callid_col].val.str_val= s->callid;
1509                                         query_vals[totag_col].val.str_val= s->to_tag;
1510                                         query_vals[fromtag_col].val.str_val= s->from_tag;
1511                                         query_vals[to_user_col].val.str_val = s->to_user;
1512                                         query_vals[to_domain_col].val.str_val = s->to_domain;
1513                                         query_vals[from_user_col].val.str_val = s->from_user;
1514                                         query_vals[from_domain_col].val.str_val = s->from_domain;
1515                                         query_vals[event_col].val.str_val = s->event->name;
1516                                         query_vals[event_id_col].val.str_val = s->event_id;
1517                                         query_vals[local_cseq_col].val.int_val= s->local_cseq;
1518                                         query_vals[remote_cseq_col].val.int_val= s->remote_cseq;
1519                                         query_vals[expires_col].val.int_val = s->expires;
1520                                         query_vals[record_route_col].val.str_val = s->record_route;
1521                                         query_vals[contact_col].val.str_val = s->contact;
1522                                         query_vals[local_contact_col].val.str_val = s->local_contact;
1523                                         query_vals[version_col].val.int_val= s->version;
1524                                         query_vals[status_col].val.int_val= s->status;
1525                                         query_vals[reason_col].val.str_val= s->reason;
1526                                         query_vals[socket_info_col].val.str_val= s->sockinfo_str;
1527                                 
1528                                         if(dbf.insert(db,query_cols,query_vals,n_query_cols )<0)
1529                                         {
1530                                                 LM_ERR("unsuccessful sql insert\n");
1531                                                 if(!no_lock)
1532                                                         lock_release(&hash_table[i].lock);
1533                                                 return ;
1534                                         }
1535                                         break;                                                                          
1536                                 }
1537
1538                         }
1539                         s->db_flag= NO_UPDATEDB_FLAG;   
1540                         prev_s= s;
1541                         s= s->next;
1542                 }
1543                 if(!no_lock)
1544                         lock_release(&hash_table[i].lock);      
1545         }
1546
1547         update_vals[0].val.int_val= (int)time(NULL)- 10;
1548         update_ops[0]= OP_LT;
1549         if(dbf.delete(db, update_cols, update_ops, update_vals, 1) < 0)
1550         {
1551                 LM_ERR("deleting expired information from database\n");
1552         }
1553
1554 }
1555
1556 int restore_db_subs(void)
1557 {
1558         db_key_t result_cols[22]; 
1559         db_res_t *res= NULL;
1560         db_row_t *row = NULL;   
1561         db_val_t *row_vals= NULL;
1562         int i;
1563         int n_result_cols= 0;
1564         int pres_uri_col, expires_col, from_user_col, from_domain_col,to_user_col; 
1565         int callid_col,totag_col,fromtag_col,to_domain_col,sockinfo_col,reason_col;
1566         int event_col,contact_col,record_route_col, event_id_col, status_col;
1567         int remote_cseq_col, local_cseq_col, local_contact_col, version_col;
1568         subs_t s;
1569         str ev_sname;
1570         pres_ev_t* event= NULL;
1571         event_t parsed_event;
1572         unsigned int expires;
1573         unsigned int hash_code;
1574
1575         result_cols[pres_uri_col=n_result_cols++]       =&str_presentity_uri_col;
1576         result_cols[expires_col=n_result_cols++]=&str_expires_col;
1577         result_cols[event_col=n_result_cols++]  =&str_event_col;
1578         result_cols[event_id_col=n_result_cols++]=&str_event_id_col;
1579         result_cols[to_user_col=n_result_cols++]        =&str_to_user_col;
1580         result_cols[to_domain_col=n_result_cols++]      =&str_to_domain_col;
1581         result_cols[from_user_col=n_result_cols++]      =&str_watcher_username_col;
1582         result_cols[from_domain_col=n_result_cols++]=&str_watcher_domain_col;
1583         result_cols[callid_col=n_result_cols++] =&str_callid_col;
1584         result_cols[totag_col=n_result_cols++]  =&str_to_tag_col;
1585         result_cols[fromtag_col=n_result_cols++]=&str_from_tag_col;
1586         result_cols[local_cseq_col= n_result_cols++]    =&str_local_cseq_col;
1587         result_cols[remote_cseq_col= n_result_cols++]   =&str_remote_cseq_col;
1588         result_cols[record_route_col= n_result_cols++]  =&str_record_route_col;
1589         result_cols[sockinfo_col= n_result_cols++]      =&str_socket_info_col;
1590         result_cols[contact_col= n_result_cols++]       =&str_contact_col;
1591         result_cols[local_contact_col= n_result_cols++] =&str_local_contact_col;
1592         result_cols[version_col= n_result_cols++]       =&str_version_col;
1593         result_cols[status_col= n_result_cols++]        =&str_status_col;
1594         result_cols[reason_col= n_result_cols++]        =&str_reason_col;
1595         
1596         if(!pa_db)
1597         {
1598                 LM_ERR("null database connection\n");
1599                 return -1;
1600         }
1601
1602         if(pa_dbf.use_table(pa_db, &active_watchers_table)< 0)
1603         {
1604                 LM_ERR("in use table\n");
1605                 return -1;
1606         }
1607
1608         if(pa_dbf.query(pa_db,0, 0, 0, result_cols,0, n_result_cols, 0,&res)< 0)
1609         {
1610                 LM_ERR("while querrying table\n");
1611                 if(res)
1612                 {
1613                         pa_dbf.free_result(pa_db, res);
1614                         res = NULL;
1615                 }
1616                 return -1;
1617         }
1618         if(res== NULL)
1619                 return -1;
1620
1621         if(res->n<=0)
1622         {
1623                 LM_INFO("The query returned no result\n");
1624                 pa_dbf.free_result(pa_db, res);
1625                 res = NULL;
1626                 return 0;
1627         }
1628
1629         LM_DBG("found %d db entries\n", res->n);
1630
1631         for(i =0 ; i< res->n ; i++)
1632         {
1633                 row = &res->rows[i];
1634                 row_vals = ROW_VALUES(row);
1635                 memset(&s, 0, sizeof(subs_t));
1636
1637                 expires= row_vals[expires_col].val.int_val;
1638                 
1639                 if(expires< (int)time(NULL))
1640                         continue;
1641         
1642                 s.pres_uri.s= (char*)row_vals[pres_uri_col].val.string_val;
1643                 s.pres_uri.len= strlen(s.pres_uri.s);
1644                 
1645                 s.to_user.s=(char*)row_vals[to_user_col].val.string_val;
1646                 s.to_user.len= strlen(s.to_user.s);
1647
1648                 s.to_domain.s=(char*)row_vals[to_domain_col].val.string_val;
1649                 s.to_domain.len= strlen(s.to_domain.s);
1650
1651                 s.from_user.s=(char*)row_vals[from_user_col].val.string_val;
1652                 s.from_user.len= strlen(s.from_user.s);
1653                 
1654                 s.from_domain.s=(char*)row_vals[from_domain_col].val.string_val;
1655                 s.from_domain.len= strlen(s.from_domain.s);
1656
1657                 s.to_tag.s=(char*)row_vals[totag_col].val.string_val;
1658                 s.to_tag.len= strlen(s.to_tag.s);
1659
1660                 s.from_tag.s=(char*)row_vals[fromtag_col].val.string_val;
1661                 s.from_tag.len= strlen(s.from_tag.s);
1662
1663                 s.callid.s=(char*)row_vals[callid_col].val.string_val;
1664                 s.callid.len= strlen(s.callid.s);
1665
1666                 ev_sname.s= (char*)row_vals[event_col].val.string_val;
1667                 ev_sname.len= strlen(ev_sname.s);
1668                 
1669                 event= contains_event(&ev_sname, &parsed_event);
1670                 if(event== NULL)
1671                 {
1672                         LM_DBG("insert a new event structure in the list waiting"
1673                                         " to be filled in\n");
1674         
1675                         /*insert a new event structure in the list waiting to be filled in*/
1676                         event= (pres_ev_t*)shm_malloc(sizeof(pres_ev_t));
1677                         if(event== NULL)
1678                         {
1679                                 free_event_params(parsed_event.params, PKG_MEM_TYPE);
1680                                 ERR_MEM(SHM_MEM_STR);
1681                         }
1682                         memset(event, 0, sizeof(pres_ev_t));
1683                         event->name.s= (char*)shm_malloc(ev_sname.len* sizeof(char));
1684                         if(event->name.s== NULL)
1685                         {
1686                                 free_event_params(parsed_event.params, PKG_MEM_TYPE);
1687                                 ERR_MEM(SHM_MEM_STR);
1688                         }
1689                         memcpy(event->name.s,ev_sname.s, ev_sname.len);
1690                         event->name.len= ev_sname.len;
1691                         
1692                         event->evp= shm_copy_event(&parsed_event);
1693                         if(event->evp== NULL)
1694                         {
1695                                 LM_ERR("ERROR copying event_t structure\n");
1696                                 free_event_params(parsed_event.params, PKG_MEM_TYPE);
1697                                 goto error;
1698                         }
1699                         event->next= EvList->events;
1700                         EvList->events= event;
1701                 }
1702                         
1703                 free_event_params(parsed_event.params, PKG_MEM_TYPE);
1704
1705                 s.event= event;
1706
1707                 s.event_id.s=(char*)row_vals[event_id_col].val.string_val;
1708                 if(s.event_id.s)
1709                         s.event_id.len= strlen(s.event_id.s);
1710
1711                 s.remote_cseq= row_vals[remote_cseq_col].val.int_val;
1712                 s.local_cseq= row_vals[local_cseq_col].val.int_val;
1713                 s.version= row_vals[version_col].val.int_val;
1714                 
1715                 s.expires= expires- (int)time(NULL);
1716                 s.status= row_vals[status_col].val.int_val;
1717
1718                 s.reason.s= (char*)row_vals[reason_col].val.string_val;
1719                 if(s.reason.s)
1720                         s.reason.len= strlen(s.reason.s);
1721
1722                 s.contact.s=(char*)row_vals[contact_col].val.string_val;
1723                 s.contact.len= strlen(s.contact.s);
1724
1725                 s.local_contact.s=(char*)row_vals[local_contact_col].val.string_val;
1726                 s.local_contact.len= strlen(s.local_contact.s);
1727         
1728                 s.record_route.s=(char*)row_vals[record_route_col].val.string_val;
1729                 if(s.record_route.s)
1730                         s.record_route.len= strlen(s.record_route.s);
1731         
1732                 s.sockinfo_str.s=(char*)row_vals[sockinfo_col].val.string_val;
1733                 s.sockinfo_str.len= strlen(s.sockinfo_str.s);
1734
1735                 hash_code= core_hash(&s.pres_uri, &s.event->name, shtable_size);
1736                 if(insert_shtable(subs_htable, hash_code, &s)< 0)
1737                 {
1738                         LM_ERR("adding new record in hash table\n");
1739                         goto error;
1740                 }
1741         }
1742
1743         pa_dbf.free_result(pa_db, res);
1744
1745         /* delete all records */
1746         if(pa_dbf.delete(pa_db, 0,0,0,0)< 0)
1747         {
1748                 LM_ERR("deleting all records from database table\n");
1749                 return -1;
1750         }
1751
1752         return 0;
1753
1754 error:
1755         if(res)
1756                 pa_dbf.free_result(pa_db, res);
1757         return -1;
1758
1759 }
1760
1761 int refresh_watcher(str* pres_uri, str* watcher_uri, str* event, 
1762                 int status, str* reason)
1763 {
1764         unsigned int hash_code;
1765         subs_t* s, *s_copy;
1766         pres_ev_t* ev;          
1767         struct sip_uri uri;
1768         str user, domain;
1769         /* refresh status in subs_htable and send notify */
1770
1771         ev=     contains_event(event, NULL);
1772         if(ev== NULL)
1773         {
1774                 LM_ERR("while searching event in list\n");
1775                 return -1;
1776         }
1777
1778         if(parse_uri(watcher_uri->s, watcher_uri->len, &uri)< 0)
1779         {
1780                 LM_ERR("parsing uri\n");
1781                 return -1;
1782         }
1783         user= uri.user;
1784         domain= uri.host;
1785
1786         hash_code= core_hash(pres_uri, event, shtable_size);
1787
1788         lock_get(&subs_htable[hash_code].lock);
1789
1790         s= subs_htable[hash_code].entries->next;
1791
1792         while(s)
1793         {
1794                 if(s->event== ev && s->pres_uri.len== pres_uri->len &&
1795                         strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0 &&
1796                         s->from_user.len==user.len && strncmp(s->from_user.s,user.s, user.len)==0 &&
1797                         s->from_domain.len== domain.len && 
1798                         strncmp(s->from_domain.s, domain.s, domain.len)== 0)
1799                 {
1800                         s->status= status;
1801                         if(reason)
1802                                 s->reason= *reason;
1803                         
1804                         s_copy= mem_copy_subs(s, PKG_MEM_TYPE);
1805                         if(s_copy== NULL)
1806                         {
1807                                 LM_ERR("copying subs_t\n");
1808                                 lock_release(&subs_htable[hash_code].lock);
1809                                 return -1;
1810                         }
1811                         lock_release(&subs_htable[hash_code].lock);
1812                         if(notify(s_copy, NULL, NULL, 0)< 0)
1813                         {
1814                                 LM_ERR("in notify function\n");
1815                                 pkg_free(s_copy);
1816                                 return -1;
1817                         }
1818                         pkg_free(s_copy);
1819                         lock_get(&subs_htable[hash_code].lock);
1820                 }
1821                 s= s->next;
1822         }
1823         return 0;
1824 }
1825
1826 int get_db_subs_auth(subs_t* subs, int* found)
1827 {
1828         db_key_t db_keys[5];
1829         db_val_t db_vals[5];
1830         int n_query_cols= 0; 
1831         db_key_t result_cols[3];
1832         db_res_t *result = NULL;
1833         db_row_t *row ; 
1834         db_val_t *row_vals ;
1835
1836         db_keys[n_query_cols] =&str_presentity_uri_col;
1837         db_vals[n_query_cols].type = DB_STR;
1838         db_vals[n_query_cols].nul = 0;
1839         db_vals[n_query_cols].val.str_val= subs->pres_uri;
1840         n_query_cols++;
1841
1842         db_keys[n_query_cols] =&str_watcher_username_col;
1843         db_vals[n_query_cols].type = DB_STR;
1844         db_vals[n_query_cols].nul = 0;
1845         db_vals[n_query_cols].val.str_val = subs->from_user;
1846         n_query_cols++;
1847
1848         db_keys[n_query_cols] =&str_watcher_domain_col;
1849         db_vals[n_query_cols].type = DB_STR;
1850         db_vals[n_query_cols].nul = 0;
1851         db_vals[n_query_cols].val.str_val = subs->from_domain;
1852         n_query_cols++;
1853         
1854         db_keys[n_query_cols] =&str_event_col;
1855         db_vals[n_query_cols].type = DB_STR;
1856         db_vals[n_query_cols].nul = 0;
1857         db_vals[n_query_cols].val.str_val = subs->event->name;
1858         n_query_cols++;
1859
1860         result_cols[0] = &str_status_col;
1861         result_cols[1] = &str_reason_col;
1862         
1863         if(pa_dbf.use_table(pa_db, &watchers_table)< 0)
1864         {
1865                 LM_ERR("in use table\n");
1866                 return -1;
1867         }       
1868
1869         if(pa_dbf.query(pa_db, db_keys, 0, db_vals, result_cols,
1870                                         n_query_cols, 2, 0, &result )< 0)
1871         {
1872                 LM_ERR("while querying watchers table\n");
1873                 if(result)
1874                         pa_dbf.free_result(pa_db, result);
1875                 return -1;
1876         }
1877         if(result== NULL)
1878                 return -1;
1879         
1880         if(result->n<= 0)
1881         {
1882                 *found= 0;
1883                 pa_dbf.free_result(pa_db, result);
1884                 return 0;
1885         }
1886
1887         *found= 1;
1888         row = &result->rows[0];
1889         row_vals = ROW_VALUES(row);
1890         subs->status= row_vals[0].val.int_val;
1891
1892         if(row_vals[1].val.string_val)
1893         {
1894
1895                 subs->reason.len= strlen(row_vals[1].val.string_val);
1896                 if(subs->reason.len== 0)
1897                         subs->reason.s= NULL;
1898                 else
1899                 {
1900                         subs->reason.s= (char*)pkg_malloc(subs->reason.len*sizeof(char));
1901                         if(subs->reason.s== NULL)
1902                         {
1903                                 pa_dbf.free_result(pa_db, result);
1904                                 ERR_MEM(PKG_MEM_STR);
1905                         }               
1906                         memcpy(subs->reason.s, row_vals[1].val.string_val, subs->reason.len);
1907                 }
1908         }
1909         
1910         pa_dbf.free_result(pa_db, result);
1911         return 0;
1912 error:
1913         return -1;
1914 }       
1915
1916 int insert_db_subs_auth(subs_t* subs)
1917 {
1918         db_key_t db_keys[10];
1919         db_val_t db_vals[10];
1920         int n_query_cols= 0; 
1921
1922         db_keys[n_query_cols] =&str_presentity_uri_col;
1923         db_vals[n_query_cols].type = DB_STR;
1924         db_vals[n_query_cols].nul = 0;
1925         db_vals[n_query_cols].val.str_val= subs->pres_uri;
1926         n_query_cols++;
1927
1928         db_keys[n_query_cols] =&str_watcher_username_col;
1929         db_vals[n_query_cols].type = DB_STR;
1930         db_vals[n_query_cols].nul = 0;
1931         db_vals[n_query_cols].val.str_val = subs->from_user;
1932         n_query_cols++;
1933
1934         db_keys[n_query_cols] =&str_watcher_domain_col;
1935         db_vals[n_query_cols].type = DB_STR;
1936         db_vals[n_query_cols].nul = 0;
1937         db_vals[n_query_cols].val.str_val = subs->from_domain;
1938         n_query_cols++;
1939         
1940         db_keys[n_query_cols] =&str_event_col;
1941         db_vals[n_query_cols].type = DB_STR;
1942         db_vals[n_query_cols].nul = 0;
1943         db_vals[n_query_cols].val.str_val = subs->event->name;
1944         n_query_cols++;
1945
1946         db_keys[n_query_cols] =&str_status_col;
1947         db_vals[n_query_cols].type = DB_INT;
1948         db_vals[n_query_cols].nul = 0;
1949         db_vals[n_query_cols].val.int_val = subs->status;
1950         n_query_cols++;
1951                                                                 
1952         db_keys[n_query_cols] = &str_inserted_time_col;
1953         db_vals[n_query_cols].type = DB_INT;
1954         db_vals[n_query_cols].nul = 0;
1955         db_vals[n_query_cols].val.int_val= (int)time(NULL);
1956         n_query_cols++;
1957         
1958         if(subs->reason.s && subs->reason.len)
1959         {
1960                 db_keys[n_query_cols] =&str_reason_col;
1961                 db_vals[n_query_cols].type = DB_STR;
1962                 db_vals[n_query_cols].nul = 0;
1963                 db_vals[n_query_cols].val.str_val = subs->reason;
1964                 n_query_cols++; 
1965         }       
1966         
1967         if (pa_dbf.use_table(pa_db, &watchers_table) < 0) 
1968         {
1969                 LM_ERR("in use_table\n");
1970                 return -1;
1971         }
1972
1973         if(pa_dbf.insert(pa_db, db_keys, db_vals, n_query_cols )< 0)
1974         {       
1975                 LM_ERR("in sql insert\n");
1976                 return -1;
1977         }
1978
1979         return 0;
1980 }