- fixed an error in the last commit
[sip-router] / modules_k / presence / subscribe.c
1 /*
2  * $Id$
3  *
4  * presence module - presence server implementation
5  *
6  * Copyright (C) 2006 Voice Sistem S.R.L.
7  *
8  * This file is part of openser, a free SIP serves.
9  *
10  * openser is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * openser is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License 
21  * along with this program; if not, write to the Free Software 
22  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  * History:
25  * --------
26  *  2006-08-15  initial version (anca)
27  */
28
29 #include "../../ut.h"
30 #include "../../usr_avp.h"
31 #include "../../data_lump_rpl.h"
32 #include "../../parser/parse_expires.h"
33 #include "../../parser/parse_event.h"
34 #include "../../parser/contact/parse_contact.h"
35 #include "presence.h"
36 #include "subscribe.h"
37 #include "utils_func.h"
38 #include "notify.h"
39 #include "../pua/hash.h"
40
41 int get_stored_info(struct sip_msg* msg, subs_t* subs, int* error_ret, str* reply_str);
42 int get_database_info(struct sip_msg* msg, subs_t* subs, int* error_ret, str* reply_str);
43 int refresh_watcher(str* pres_uri, str* watcher_uri, str* event, 
44                 int status, str* reason);
45
46 int get_db_subs_auth(subs_t* subs, int* found);
47 int insert_db_subs_auth(subs_t* subs);
48
49 static str su_200_rpl  = str_init("OK");
50 static str pu_481_rpl  = str_init("Subscription does not exist");
51 static str pu_400_rpl  = str_init("Bad request");
52 static str pu_500_rpl  = str_init("Server Internal Error");
53 static str pu_489_rpl  = str_init("Bad Event");
54
55
56 int send_2XX_reply(struct sip_msg * msg, int reply_code, int lexpire,
57                 str *rtag, str* local_contact)
58 {
59         static str hdr_append;
60         
61         hdr_append.s = (char *)pkg_malloc( sizeof(char)*(local_contact->len+ 128));
62         if(hdr_append.s == NULL)
63         {
64                 ERR_MEM(PKG_MEM_STR);
65         }
66         hdr_append.len = sprintf(hdr_append.s, "Expires: %d\r\n", lexpire);     
67         
68         strncpy(hdr_append.s+hdr_append.len ,"Contact: <", 10);
69         hdr_append.len += 10;
70         strncpy(hdr_append.s+hdr_append.len, local_contact->s, local_contact->len);
71         hdr_append.len+= local_contact->len;
72         strncpy(hdr_append.s+hdr_append.len, ">", 1);
73         hdr_append.len += 1;
74         strncpy(hdr_append.s+hdr_append.len, CRLF, CRLF_LEN);
75         hdr_append.len += CRLF_LEN;
76
77         hdr_append.s[hdr_append.len]= '\0';
78         
79         if (add_lump_rpl( msg, hdr_append.s, hdr_append.len, LUMP_RPL_HDR)==0 )
80         {
81                 LM_ERR("unable to add lump_rl\n");
82                 goto error;
83         }
84
85         if( slb.reply_dlg( msg, reply_code, &su_200_rpl, rtag)== -1)
86         {
87                 LM_ERR("sending reply\n");
88                 goto error;
89         }
90         
91         pkg_free(hdr_append.s);
92         return 0;
93
94 error:
95
96         pkg_free(hdr_append.s);
97         return -1;
98 }
99
100
101 int delete_db_subs(str pres_uri, str ev_stored_name, str to_tag)
102 {
103         db_key_t query_cols[5];
104         db_val_t query_vals[5];
105         int n_query_cols= 0;
106
107         query_cols[n_query_cols] = "presentity_uri";
108         query_vals[n_query_cols].type = DB_STR;
109         query_vals[n_query_cols].nul = 0;
110         query_vals[n_query_cols].val.str_val = pres_uri;
111         n_query_cols++;
112
113         query_cols[n_query_cols] = "event";
114         query_vals[n_query_cols].type = DB_STR;
115         query_vals[n_query_cols].nul = 0;
116         query_vals[n_query_cols].val.str_val = ev_stored_name;
117         n_query_cols++;
118
119         query_cols[n_query_cols] = "to_tag";
120         query_vals[n_query_cols].type = DB_STR;
121         query_vals[n_query_cols].nul = 0;
122         query_vals[n_query_cols].val.str_val = to_tag;
123         n_query_cols++;
124         
125         if (pa_dbf.use_table(pa_db, active_watchers_table) < 0) 
126         {
127                 LM_ERR("in use table sql operation\n");
128                 return -1;
129         }
130
131         if(pa_dbf.delete(pa_db, query_cols, 0, query_vals,
132                                 n_query_cols)< 0 )
133         {
134                 LM_ERR("cleaning unsubscribed messages\n");
135                 return -1;
136         }
137
138         return 0;
139 }
140
141 int update_subs_db(subs_t* subs, int type)
142 {
143         db_key_t query_cols[22], update_keys[7];
144         db_val_t query_vals[22], update_vals[7];
145         int n_update_cols= 0;
146         int n_query_cols = 0;
147
148         query_cols[n_query_cols] = "presentity_uri";
149         query_vals[n_query_cols].type = DB_STR;
150         query_vals[n_query_cols].nul = 0;
151         query_vals[n_query_cols].val.str_val = subs->pres_uri;
152         n_query_cols++;
153         
154         query_cols[n_query_cols] = "watcher_username";
155         query_vals[n_query_cols].type = DB_STR;
156         query_vals[n_query_cols].nul = 0;
157         query_vals[n_query_cols].val.str_val = subs->from_user;
158         n_query_cols++;
159         
160         query_cols[n_query_cols] = "watcher_domain";
161         query_vals[n_query_cols].type = DB_STR;
162         query_vals[n_query_cols].nul = 0;
163         query_vals[n_query_cols].val.str_val = subs->from_domain;
164         n_query_cols++;
165
166         query_cols[n_query_cols] = "event";
167         query_vals[n_query_cols].type = DB_STR;
168         query_vals[n_query_cols].nul = 0;
169         query_vals[n_query_cols].val.str_val = subs->event->name;
170         n_query_cols++;
171
172         if(subs->event_id.s)
173         {
174                 query_cols[n_query_cols] = "event_id";
175                 query_vals[n_query_cols].type = DB_STR;
176                 query_vals[n_query_cols].nul = 0;
177                 query_vals[n_query_cols].val.str_val = subs->event_id;
178                 n_query_cols++;
179         }
180         query_cols[n_query_cols] = "callid";
181         query_vals[n_query_cols].type = DB_STR;
182         query_vals[n_query_cols].nul = 0;
183         query_vals[n_query_cols].val.str_val = subs->callid;
184         n_query_cols++;
185
186         query_cols[n_query_cols] = "to_tag";
187         query_vals[n_query_cols].type = DB_STR;
188         query_vals[n_query_cols].nul = 0;
189         query_vals[n_query_cols].val.str_val = subs->to_tag;
190         n_query_cols++;
191
192         query_cols[n_query_cols] = "from_tag";
193         query_vals[n_query_cols].type = DB_STR;
194         query_vals[n_query_cols].nul = 0;
195         query_vals[n_query_cols].val.str_val = subs->from_tag;
196         n_query_cols++;
197
198         if(type & REMOTE_TYPE)
199         {
200                 update_keys[n_update_cols] = "expires";
201                 update_vals[n_update_cols].type = DB_INT;
202                 update_vals[n_update_cols].nul = 0;
203                 update_vals[n_update_cols].val.int_val = subs->expires + (int)time(NULL);
204                 n_update_cols++;
205         
206                 update_keys[n_update_cols] = "remote_cseq";
207                 update_vals[n_update_cols].type = DB_INT;
208                 update_vals[n_update_cols].nul = 0;
209                 update_vals[n_update_cols].val.int_val = subs->remote_cseq; 
210                 n_update_cols++;
211         }
212         else
213         {       
214                 update_keys[n_update_cols] = "local_cseq";
215                 update_vals[n_update_cols].type = DB_INT;
216                 update_vals[n_update_cols].nul = 0;
217                 update_vals[n_update_cols].val.int_val = subs->local_cseq+ 1;
218                 n_update_cols++;
219         
220                 update_keys[n_update_cols] = "version";
221                 update_vals[n_update_cols].type = DB_INT;
222                 update_vals[n_update_cols].nul = 0;
223                 update_vals[n_update_cols].val.int_val = subs->version+ 1; 
224                 n_update_cols++;
225         }
226
227         update_keys[n_update_cols] = "status";
228         update_vals[n_update_cols].type = DB_INT;
229         update_vals[n_update_cols].nul = 0;
230         update_vals[n_update_cols].val.int_val = subs->status;
231         n_update_cols++;
232
233         update_keys[n_update_cols] = "reason";
234         update_vals[n_update_cols].type = DB_STR;
235         update_vals[n_update_cols].nul = 0;
236         update_vals[n_update_cols].val.str_val = subs->reason;
237         n_update_cols++;
238
239         if (pa_dbf.use_table(pa_db, active_watchers_table) < 0) 
240         {
241                 LM_ERR("in use table sql operation\n"); 
242                 return -1;
243         }
244                 
245         if( pa_dbf.update( pa_db,query_cols, 0, query_vals,
246                                 update_keys, update_vals, n_query_cols,n_update_cols)<0) 
247         {
248                 LM_ERR("updating presence information\n");
249                 return -1;
250         }
251         return 0;
252 }
253
254 int update_subscription(struct sip_msg* msg, subs_t* subs, int to_tag_gen,
255                 int* sent_reply)
256 {       
257         unsigned int hash_code;
258         
259         printf_subs(subs);      
260         
261         *sent_reply= 0;
262
263         hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
264
265         if( to_tag_gen ==0) /*if a SUBSCRIBE within a dialog */
266         {
267                 if(subs->expires == 0)
268                 {
269                         LM_DBG("expires =0 -> deleting record\n");
270                 
271                         if( delete_db_subs(subs->pres_uri, 
272                                                 subs->event->name, subs->to_tag)< 0)
273                         {
274                                 LM_ERR("deleting subscription record from database\n");
275                                 goto error;
276                         }
277                         /* delete record from hash table also */
278
279                         subs->local_cseq= delete_shtable(subs_htable,hash_code,
280                                         subs->to_tag);
281                 
282                         if(subs->event->type & PUBL_TYPE)
283                         {       
284                                 if( send_2XX_reply(msg, 202, subs->expires, &subs->to_tag,
285                                                         &subs->local_contact) <0)
286                                 {
287                                         LM_ERR("sending 202 OK\n");
288                                         goto error;
289                                 }
290                                 *sent_reply= 1;
291                                 if(subs->event->wipeer)
292                                 {
293                                         if(query_db_notify(&subs->pres_uri,
294                                                                 subs->event->wipeer, NULL)< 0)
295                                         {
296                                                 LM_ERR("Could not send notify for winfo\n");
297                                                 goto error;
298                                         }
299                                 }
300
301                         }       
302                         else /* if unsubscribe for winfo */
303                         {
304                                 if( send_2XX_reply(msg, 200, subs->expires, &subs->to_tag,
305                                                         &subs->local_contact) <0)
306                                 {
307                                         LM_ERR("sending 202 OK reply\n");
308                                         goto error;
309                                 }
310                                 *sent_reply= 1;
311                         }
312                 
313                         subs->status= TERMINATED_STATUS;
314                         subs->reason.s= "timeout";
315                         subs->reason.len= 7;
316                         if(notify(subs, NULL, NULL, 0)< 0)
317                         {
318                                 LM_ERR("Could not send notify \n");
319                                 goto error;
320                         }
321                         return 1;
322                 }
323
324                 if(update_shtable(subs_htable, hash_code, subs, REMOTE_TYPE)< 0)
325                 {
326                         if(fallback2db)
327                         {
328                                 /* update in database table */
329                                 if(update_subs_db(subs, REMOTE_TYPE)< 0)
330                                 {
331                                         LM_ERR("updating subscription in database table\n");
332                                         goto error;
333                                 }
334                         }
335                         else
336                         {
337                                 LM_ERR("updating subscription record in hash table\n");
338                                 goto error;
339                         }
340                 }
341         }
342         else
343         {
344                 if(subs->expires!= 0)
345                 {       
346                         if(insert_shtable(subs_htable,hash_code,subs)< 0)
347                         {
348                                 LM_ERR("inserting new record in subs_htable\n");
349                                 goto error;
350                         }
351                 }
352                 /*otherwise there is a subscription outside a dialog with expires= 0 
353                  * no update in database, but should try to send Notify */
354         }
355
356 /* reply_and_notify  */
357
358         if(subs->event->type & PUBL_TYPE)
359         {       
360                 if(send_2XX_reply(msg, 202, subs->expires,&subs->to_tag,
361                                         &subs->local_contact)<0)
362                 {
363                         LM_ERR("sending 202 OK reply\n");
364                         goto error;
365                 }
366                 *sent_reply= 1;
367                 
368                 if(subs->expires!= 0 && subs->event->wipeer)
369                 {       
370                         LM_DBG("send Notify with winfo\n");
371                         if(query_db_notify(&subs->pres_uri, subs->event->wipeer, subs)< 0)
372                         {
373                                 LM_ERR("Could not send notify winfo\n");
374                                 goto error;
375                         }       
376                         if(subs->send_on_cback== 0)
377                         {       
378                                 if(notify(subs, NULL, NULL, 0)< 0)
379                                 {
380                                         LM_ERR("Could not send notify\n");
381                                         goto error;
382                                 }
383                         }
384                 }
385                 else
386                 {
387                         if(notify(subs, NULL, NULL, 0)< 0)
388                         {
389                                 LM_ERR("Could not send notify\n");
390                                 goto error;
391                         }
392                 }       
393                         
394         }
395         else 
396         {
397                 if( send_2XX_reply(msg, 200, subs->expires, &subs->to_tag,
398                                         &subs->local_contact)<0)
399                 {
400                         LM_ERR("sending 202 OK reply\n");
401                         goto error;
402                 }               
403                 *sent_reply= 1;
404                 
405                 if(notify(subs, NULL, NULL, 0 )< 0)
406                 {
407                         LM_ERR("sending notify request\n");
408                         goto error;
409                 }
410         }
411         return 0;
412         
413 error:
414
415         LM_ERR("occured\n");
416         return -1;
417
418 }
419
420 void msg_watchers_clean(unsigned int ticks,void *param)
421 {
422         db_key_t db_keys[3], result_cols[1];
423         db_val_t db_vals[3];
424         db_op_t  db_ops[3] ;
425         db_res_t *result= NULL;
426
427         LM_DBG("cleaning pending subscriptions\n");
428         
429         db_keys[0] ="inserted_time";
430         db_ops[0] = OP_LT;
431         db_vals[0].type = DB_INT;
432         db_vals[0].nul = 0;
433         db_vals[0].val.int_val = (int)time(NULL)- 24*3600 ;
434
435         db_keys[1] = "status";
436         db_ops [1] = OP_EQ;
437         db_vals[1].type = DB_INT;
438         db_vals[1].nul = 0;
439         db_vals[1].val.int_val = PENDING_STATUS;
440
441         result_cols[0]= "id";
442         if (pa_dbf.use_table(pa_db, watchers_table) < 0) 
443         {
444                 LM_ERR("unsuccessful use table sql operation\n");
445                 return ;
446         }
447         
448         if(pa_dbf.query(pa_db, db_keys, db_ops, db_vals, result_cols, 2, 1, 0, &result )< 0)
449         {
450                 LM_ERR("querying database for expired messages\n");
451                 if(result)
452                         pa_dbf.free_result(pa_db, result);
453                 return;
454         }
455         if(result == NULL)
456                 return;
457         if(result->n <= 0)
458         {
459                 pa_dbf.free_result(pa_db, result);
460                 return;
461         }
462         pa_dbf.free_result(pa_db, result);
463
464         if (pa_dbf.delete(pa_db, db_keys, db_ops, db_vals, 2) < 0) 
465                 LM_ERR("cleaning pending subscriptions\n");
466 }
467
468 int handle_subscribe(struct sip_msg* msg, char* str1, char* str2)
469 {
470         int  to_tag_gen = 0;
471         subs_t subs;
472         pres_ev_t* event= NULL;
473         event_t* parsed_event= NULL;
474         param_t* ev_param= NULL;
475         int found;
476         str reason= {0, 0};
477         struct sip_uri uri;
478         int reply_code;
479         str reply_str;
480         int sent_reply= 0;
481
482         /* ??? rename to avoid collisions with other symbols */
483         counter++;
484
485         memset(&subs, 0, sizeof(subs_t));
486         
487         reply_code= 500;
488         reply_str= pu_500_rpl;
489
490         if( parse_headers(msg,HDR_EOH_F, 0)==-1 )
491         {
492                 LM_ERR("parsing headers\n");
493                 reply_code= 400;
494                 reply_str= pu_400_rpl;
495                 goto error;
496         }
497         
498         /* inspecting the Event header field */
499         if(msg->event && msg->event->body.len > 0)
500         {
501                 if (!msg->event->parsed && (parse_event(msg->event) < 0))
502                 {
503                         reply_code= 400;
504                         reply_str= pu_400_rpl;
505                         goto error;
506                 }
507                 if(((event_t*)msg->event->parsed)->parsed & EVENT_OTHER)
508                 {       
509                         goto bad_event;
510                 }
511         }
512         else
513                 goto bad_event;
514
515         /* search event in the list */
516         parsed_event= (event_t*)msg->event->parsed;
517         event= search_event(parsed_event);
518         if(event== NULL)
519         {
520                 goto bad_event;
521         }
522         subs.event= event;
523         
524         /* extract the id if any*/
525         ev_param= parsed_event->params;
526         while(ev_param)
527         {
528                 if(ev_param->name.len== 2 && strncmp(ev_param->name.s, "id", 2)== 0)
529                 {
530                         subs.event_id= ev_param->body;
531                         break;
532                 }
533                 ev_param= ev_param->next;
534         }               
535         
536         if(extract_sdialog_info(&subs, msg, max_expires, &to_tag_gen)< 0)
537         {
538                 LM_ERR("while extracting dialog information\n");
539                 goto error;
540         }
541
542         /* getting presentity uri from Request-URI if initial subscribe - or else from database*/
543         if(to_tag_gen)
544         {
545                 if(uandd_to_uri(msg->parsed_uri.user, msg->parsed_uri.host,
546                                         &subs.pres_uri)< 0)
547                 {
548                         LM_ERR("while constructing uri from user and domain\n");
549                         goto error;
550                 }
551         }
552         else
553         {
554                 if(get_stored_info(msg, &subs, &reply_code, &reply_str )< 0)
555                 {
556                         LM_ERR("getting stored info\n");
557                         goto error;
558                 }
559                 reason= subs.reason;
560         }       
561
562         /* call event specific subscription handling */
563         if(event->evs_subs_handl)
564         {
565                 if(event->evs_subs_handl(msg)< 0)
566                 {
567                         LM_ERR("in event specific subscription handling\n");
568                         goto error;
569                 }
570         }       
571
572
573         /* if dialog initiation Subscribe - get subscription state */
574         if(to_tag_gen)
575         {
576                 if( subs.expires== 0 )
577                 {
578                         subs.status= TERMINATED_STATUS;
579                         subs.reason.s= "timeout";
580                         subs.reason.len= 7;
581                         goto after_status;      
582                 }       
583
584                 if(!event->req_auth) 
585                         subs.status = ACTIVE_STATUS;
586                 else   
587                 {
588                         /* query in watchers_table */
589                         if(get_db_subs_auth(&subs, &found)< 0)
590                         {
591                                 LM_ERR("getting subscription status from watchers table\n");
592                                 goto error;
593                         }
594                         if(found== 0)
595                         {
596                                 /*default 'pending' status */
597                                 subs.status= PENDING_STATUS;
598                                 subs.reason.s= NULL;
599                                 subs.reason.len= 0;
600                                 /* here a query to xcap server must be done -> new process maybe */
601                         
602                                 if(parse_uri(subs.pres_uri.s, subs.pres_uri.len, &uri)< 0)
603                                 {
604                                         LM_ERR("parsing uri\n");
605                                         goto error;
606
607                                 }
608                                 if(subs.event->get_rules_doc(&uri.user, &uri.host, &subs.auth_rules_doc)< 0)
609                                 {
610                                         LM_ERR("getting rules doc\n");
611                                         goto error;
612                                 }
613                                 
614                                 if(subs.event->get_auth_status(&subs)< 0)
615                                 {
616                                         LM_ERR("in event specific function is_watcher_allowed\n");
617                                         goto error;
618                                 }
619                                 if(get_status_str(subs.status)== NULL)
620                                 {
621                                         LM_ERR("wrong status= %d\n", subs.status);
622                                         goto error;
623                                 }
624                                 if(insert_db_subs_auth(&subs)< 0)
625                                 {
626                                         LM_ERR("while inserting record in watchers table\n");
627                                         goto error;
628                                 }
629                         }
630                         else
631                         {
632                                 reason= subs.reason;
633                         }
634                 }
635         }
636
637 after_status:
638         /* check if correct status */
639         if(get_status_str(subs.status)== NULL)
640         {
641                 LM_ERR("wrong status\n");
642                 goto error;
643         }
644         
645         if( update_subscription(msg, &subs, to_tag_gen, &sent_reply) <0 )
646         {       
647                 LM_ERR("in update_subscription\n");
648                 goto error;
649         }
650         if(subs.auth_rules_doc)
651         {
652                 pkg_free(subs.auth_rules_doc->s);
653                 pkg_free(subs.auth_rules_doc);
654         }
655         if(reason.s)
656                 pkg_free(reason.s);
657         
658         if(subs.pres_uri.s)
659                 pkg_free(subs.pres_uri.s);
660         
661         if((!server_address.s) || (server_address.len== 0))
662         {
663                 pkg_free(subs.local_contact.s);
664         }
665         if(subs.record_route.s)
666                 pkg_free(subs.record_route.s);
667
668         return 1;
669
670 bad_event:
671
672         LM_ERR("Missing or unsupported event header field value\n");
673                 
674         if(parsed_event)
675                 LM_ERR("\tevent= %.*s\n",parsed_event->text.len,parsed_event->text.s);
676         
677         reply_code= BAD_EVENT_CODE;
678         reply_str= pu_489_rpl;
679
680 error:
681         
682         if(sent_reply== 0)
683         {
684                 if(send_error_reply(msg, reply_code, reply_str)< 0)
685                 {
686                         LM_ERR("failed to send reply on error case\n");
687                 }
688         }
689
690         if(subs.pres_uri.s)
691                 pkg_free(subs.pres_uri.s);
692         
693         if(subs.auth_rules_doc)
694         {
695                 if(subs.auth_rules_doc->s)
696                         pkg_free(subs.auth_rules_doc->s);
697                 pkg_free(subs.auth_rules_doc);
698         }
699         if(reason.s)
700                 pkg_free(reason.s);
701
702         if(((!server_address.s) ||(server_address.len== 0))&& subs.local_contact.s)
703         {
704                 pkg_free(subs.local_contact.s);
705         }
706         if(subs.record_route.s)
707                 pkg_free(subs.record_route.s);
708
709         return -1;
710
711 }
712
713
714 int extract_sdialog_info(subs_t* subs,struct sip_msg* msg, int mexp, int* to_tag_gen)
715 {
716         static char buf[50];
717         str rec_route= {0, 0};
718         int rt  = 0;
719         str* contact= NULL;
720         contact_body_t *b;
721         struct to_body *pto, *pfrom = NULL, TO;
722         int lexpire;
723         str rtag_value;
724         struct sip_uri uri;
725
726         /* examine the expire header field */
727         if(msg->expires && msg->expires->body.len > 0)
728         {
729                 if (!msg->expires->parsed && (parse_expires(msg->expires) < 0))
730                 {
731                         LM_ERR("cannot parse Expires header\n");
732                         goto error;
733                 }
734                 lexpire = ((exp_body_t*)msg->expires->parsed)->val;
735                 LM_DBG("'Expires' header found, value= %d\n", lexpire);
736
737         }
738         else 
739         {
740                 LM_DBG("'expires' not found; default=%d\n",subs->event->default_expires);
741                 lexpire = subs->event->default_expires;
742         }
743         if(lexpire > mexp)
744                 lexpire = mexp;
745
746         subs->expires = lexpire;
747
748         if( msg->to==NULL || msg->to->body.s==NULL)
749         {
750                 LM_ERR("cannot parse TO header\n");
751                 goto error;
752         }
753         /* examine the to header */
754         if(msg->to->parsed != NULL)
755         {
756                 pto = (struct to_body*)msg->to->parsed;
757                 LM_DBG("'To' header ALREADY PARSED: <%.*s>\n",pto->uri.len,pto->uri.s);
758         }
759         else
760         {
761                 memset( &TO , 0, sizeof(TO) );
762                 if( !parse_to(msg->to->body.s,msg->to->body.s + msg->to->body.len + 1, &TO));
763                 {
764                         LM_DBG("'To' header NOT parsed\n");
765                         goto error;
766                 }
767                 pto = &TO;
768         }
769
770         if( pto->parsed_uri.user.s && pto->parsed_uri.host.s &&
771                 pto->parsed_uri.user.len && pto->parsed_uri.host.len)
772         {
773                 subs->to_user = pto->parsed_uri.user;
774                 subs->to_domain = pto->parsed_uri.host;
775         }
776         else
777         {
778                 if(parse_uri(pto->uri.s, pto->uri.len, &uri)< 0)
779                 {
780                         LM_ERR("while parsing uri\n");
781                         goto error;
782                 }
783                 subs->to_user = uri.user;
784                 subs->to_domain = uri.host;
785         }
786
787         /* examine the from header */
788         if (!msg->from || !msg->from->body.s)
789         {
790                 LM_DBG("cannot find 'from' header!\n");
791                 goto error;
792         }
793         if (msg->from->parsed == NULL)
794         {
795                 LM_DBG("'From' header not parsed\n");
796                 /* parsing from header */
797                 if ( parse_from_header( msg )<0 ) 
798                 {
799                         LM_DBG("cannot parse From header\n");
800                         goto error;
801                 }
802         }
803         pfrom = (struct to_body*)msg->from->parsed;
804         
805         if( pfrom->parsed_uri.user.s && pfrom->parsed_uri.host.s && 
806                 pfrom->parsed_uri.user.len && pfrom->parsed_uri.host.len)
807         {
808                 subs->from_user = pfrom->parsed_uri.user;
809                 subs->from_domain = pfrom->parsed_uri.host;
810         }
811         else
812         {
813                 if(parse_uri(pfrom->uri.s, pfrom->uri.len, &uri)< 0)
814                 {
815                         LM_ERR("while parsing uri\n");
816                         goto error;
817                 }
818                 subs->from_user = uri.user;
819                 subs->from_domain = uri.host;
820         }
821
822
823         /*generate to_tag if the message does not have a to_tag*/
824         if (pto->tag_value.s==NULL || pto->tag_value.len==0 )
825         {  
826                 LM_DBG("generating to_tag\n");
827                 *to_tag_gen = 1;
828                 /*generate to_tag then insert it in avp*/
829                 
830                 rtag_value.s = buf;
831                 rtag_value.len = sprintf(rtag_value.s,"%s.%d.%d.%d", to_tag_pref,
832                                 pid, (int)time(NULL), counter);
833                 if(rtag_value.len<= 0)
834                 {
835                         LM_ERR("while creating to_tag\n");
836                         goto error;
837                 }
838         }
839         else
840         {
841                 *to_tag_gen = 0;
842                 rtag_value=pto->tag_value;
843         }
844         subs->to_tag = rtag_value;
845
846         if( msg->callid==NULL || msg->callid->body.s==NULL)
847         {
848                 LM_ERR("cannot parse callid header\n");
849                 goto error;
850         }
851         subs->callid = msg->callid->body;
852
853         if( msg->cseq==NULL || msg->cseq->body.s==NULL)
854         {
855                 LM_ERR("cannot parse cseq header\n");
856                 goto error;
857         }
858         if (str2int( &(get_cseq(msg)->number), &subs->remote_cseq)!=0 )
859         {
860                 LM_ERR("cannot parse cseq number\n");
861                 goto error;
862         }
863         if( msg->contact==NULL || msg->contact->body.s==NULL)
864         {
865                 LM_ERR("cannot parse contact header\n");
866                 goto error;
867         }
868         if( parse_contact(msg->contact) <0 )
869         {
870                 LM_ERR(" cannot parse contact"
871                                 " header\n");
872                 goto error;
873         }
874         b= (contact_body_t* )msg->contact->parsed;
875
876         if(b == NULL)
877         {
878                 LM_ERR("cannot parse contact header\n");
879                 goto error;
880         }
881         subs->contact = b->contacts->uri;
882         
883         LM_DBG("subs->contact= %.*s - len = %d\n",subs->contact.len,
884                         subs->contact.s, subs->contact.len);    
885
886         /*process record route and add it to a string*/
887         if (to_tag_gen && msg->record_route!=NULL)
888         {
889                 rt = print_rr_body(msg->record_route, &rec_route, 0, 0);
890                 if(rt != 0)
891                 {
892                         LM_ERR("processing the record route [%d]\n", rt);       
893                         rec_route.s=NULL;
894                         rec_route.len=0;
895                 //      goto error;
896                 }
897         }
898         subs->record_route = rec_route;
899                         
900         subs->sockinfo_str= msg->rcv.bind_address->sock_str;
901
902         if( pfrom->tag_value.s ==NULL || pfrom->tag_value.len == 0)
903         {
904                 LM_ERR("no from tag value present\n");
905                 goto error;
906         }
907         subs->from_tag = pfrom->tag_value;
908
909         subs->version = 0;
910         
911         if((!server_address.s) || (server_address.len== 0))
912         {
913                 contact= get_local_contact(msg);
914                 if(contact== NULL)
915                 {
916                         LM_ERR("in function get_local_contact\n");
917                         goto error;
918                 }
919                 subs->local_contact= *contact;
920         }
921         else
922                 subs->local_contact= server_address;
923         
924         return 0;
925         
926 error:
927
928         return -1;
929 }
930
931
932 int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code,
933                 str* reply_str)
934 {       
935         str pres_uri= {0, 0}, reason={0, 0};
936         subs_t* s;
937         int i;
938         unsigned int hash_code;
939
940         /* first try to_user== pres_user and to_domain== pres_domain */
941
942         uandd_to_uri(subs->to_user, subs->to_domain, &pres_uri);
943         if(pres_uri.s== NULL)
944         {
945                 LM_ERR("creating uri from user and domain\n");
946                 return -1;
947         }
948         hash_code= core_hash(&pres_uri, &subs->event->name, shtable_size);
949         lock_get(&subs_htable[hash_code].lock);
950         i= hash_code;
951         s= search_shtable(subs_htable, subs->callid, subs->to_tag,
952                         subs->from_tag, hash_code);
953         if(s)
954         {
955                 goto found_rec;
956         }
957         lock_release(&subs_htable[hash_code].lock);
958
959         pkg_free(pres_uri.s);
960         pres_uri.s= NULL;
961         LM_DBG("record not found using R-URI search iteratively\n");
962         /* take one row at a time */
963         for(i= 0; i< shtable_size; i++)
964         {
965                 lock_get(&subs_htable[i].lock);
966                 s= search_shtable(subs_htable, subs->callid,subs->to_tag,subs->from_tag, i);
967                 if(s)
968                 {
969                         pres_uri.s= (char*)pkg_malloc(s->pres_uri.len* sizeof(char));
970                         if(pres_uri.s== NULL)
971                         {
972                                 lock_release(&subs_htable[i].lock);
973                                 ERR_MEM(PKG_MEM_STR);
974                         }
975                         memcpy(pres_uri.s, s->pres_uri.s, s->pres_uri.len);
976                         pres_uri.len= s->pres_uri.len;
977                         goto found_rec;
978                 }
979                 lock_release(&subs_htable[i].lock);
980         }
981
982         if(fallback2db)
983         {
984                 return get_database_info(msg, subs, reply_code, reply_str);     
985         }
986
987         LM_ERR("record not found in hash_table\n");
988         *reply_code= 481;
989         *reply_str= pu_481_rpl;
990
991         return -1;
992
993 found_rec:
994         
995         LM_DBG("Record found in hash_table\n");
996         subs->pres_uri= pres_uri;
997         subs->status= s->status;
998         if(s->reason.s && s->reason.len)
999         {       
1000                 reason.s= (char*)pkg_malloc(s->reason.len* sizeof(char));
1001                 if(reason.s== NULL)
1002                 {
1003                         lock_release(&subs_htable[i].lock);
1004                         ERR_MEM(PKG_MEM_STR);
1005                 }
1006                 memcpy(reason.s, s->reason.s, s->reason.len);
1007                 reason.len= s->reason.len;
1008                 subs->reason= reason;
1009         }
1010         if(s->record_route.s && s->record_route.len)
1011         {
1012                 subs->record_route.s= (char*)pkg_malloc
1013                         (s->record_route.len* sizeof(char));
1014                 if(subs->record_route.s== NULL)
1015                 {
1016                         ERR_MEM(PKG_MEM_STR);
1017                 }
1018                 memcpy(subs->record_route.s, s->record_route.s, s->record_route.len);
1019                 subs->record_route.len= s->record_route.len;
1020         }
1021
1022         subs->local_cseq= s->local_cseq;
1023         
1024         if(subs->remote_cseq<= s->remote_cseq)
1025         {
1026                 LM_ERR("wrong sequence number;received: %d - stored: %d\n",
1027                                 subs->remote_cseq, s->remote_cseq);
1028                 
1029                 *reply_code= 400;
1030                 *reply_str= pu_400_rpl;
1031
1032                 lock_release(&subs_htable[i].lock);
1033                 goto error;
1034         }       
1035         lock_release(&subs_htable[i].lock);
1036
1037         return 0;
1038
1039 error:
1040         if(subs->reason.s)
1041                 pkg_free(subs->reason.s);
1042         subs->reason.s= NULL;
1043         if(subs->record_route.s)
1044                 pkg_free(subs->record_route.s);
1045         subs->record_route.s= NULL;
1046         return -1;
1047 }
1048
1049 int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* reply_str)
1050 {       
1051         db_key_t query_cols[10];
1052         db_val_t query_vals[10];
1053         db_key_t result_cols[8];
1054         db_res_t *result= NULL;
1055         db_row_t *row ; 
1056         db_val_t *row_vals ;
1057         int n_query_cols = 0;
1058         int n_result_cols = 0;
1059         int remote_cseq_col= 0, local_cseq_col= 0, status_col, reason_col;
1060         int record_route_col;
1061         int pres_uri_col;
1062         unsigned int remote_cseq;
1063         str pres_uri, record_route;
1064         str reason;
1065
1066         query_cols[n_query_cols] = "to_user";
1067         query_vals[n_query_cols].type = DB_STR;
1068         query_vals[n_query_cols].nul = 0;
1069         query_vals[n_query_cols].val.str_val = subs->to_user;
1070         n_query_cols++;
1071         
1072         query_cols[n_query_cols] = "to_domain";
1073         query_vals[n_query_cols].type = DB_STR;
1074         query_vals[n_query_cols].nul = 0;
1075         query_vals[n_query_cols].val.str_val = subs->to_domain;
1076         n_query_cols++;
1077
1078         query_cols[n_query_cols] = "watcher_username";
1079         query_vals[n_query_cols].type = DB_STR;
1080         query_vals[n_query_cols].nul = 0;
1081         query_vals[n_query_cols].val.str_val = subs->from_user;
1082         n_query_cols++;
1083         
1084         query_cols[n_query_cols] = "watcher_domain";
1085         query_vals[n_query_cols].type = DB_STR;
1086         query_vals[n_query_cols].nul = 0;
1087         query_vals[n_query_cols].val.str_val = subs->from_domain;
1088         n_query_cols++;
1089
1090         query_cols[n_query_cols] = "event";
1091         query_vals[n_query_cols].type = DB_STR;
1092         query_vals[n_query_cols].nul = 0;
1093         query_vals[n_query_cols].val.str_val = subs->event->name;
1094         n_query_cols++;
1095
1096         query_cols[n_query_cols] = "event_id";
1097         query_vals[n_query_cols].type = DB_STR;
1098         query_vals[n_query_cols].nul = 0;
1099         if( subs->event_id.s != NULL)
1100         {
1101                 query_vals[n_query_cols].val.str_val.s = subs->event_id.s;
1102                 query_vals[n_query_cols].val.str_val.len = subs->event_id.len;
1103         } else {
1104                 query_vals[n_query_cols].val.str_val.s = "";
1105                 query_vals[n_query_cols].val.str_val.len = 0;
1106         }
1107         n_query_cols++;
1108         
1109         query_cols[n_query_cols] = "callid";
1110         query_vals[n_query_cols].type = DB_STR;
1111         query_vals[n_query_cols].nul = 0;
1112         query_vals[n_query_cols].val.str_val = subs->callid;
1113         n_query_cols++;
1114
1115         query_cols[n_query_cols] = "to_tag";
1116         query_vals[n_query_cols].type = DB_STR;
1117         query_vals[n_query_cols].nul = 0;
1118         query_vals[n_query_cols].val.str_val = subs->to_tag;
1119         n_query_cols++;
1120
1121         query_cols[n_query_cols] = "from_tag";
1122         query_vals[n_query_cols].type = DB_STR;
1123         query_vals[n_query_cols].nul = 0;
1124         query_vals[n_query_cols].val.str_val = subs->from_tag;
1125         n_query_cols++;
1126
1127         result_cols[pres_uri_col=n_result_cols++] = "presentity_uri";
1128         result_cols[remote_cseq_col=n_result_cols++] = "remote_cseq";
1129         result_cols[local_cseq_col=n_result_cols++] = "local_cseq";
1130         result_cols[status_col=n_result_cols++] = "status";
1131         result_cols[reason_col=n_result_cols++] = "reason";
1132         result_cols[record_route_col=n_result_cols++] = "record_route";
1133         
1134         if (pa_dbf.use_table(pa_db, active_watchers_table) < 0) 
1135         {
1136                 LM_ERR("unsuccessful use_table sql operation\n");
1137                 return -1;
1138         }
1139         
1140         if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
1141                  result_cols, n_query_cols, n_result_cols, 0,  &result) < 0) 
1142         {
1143                 LM_ERR("querying subscription dialog\n");
1144                 if(result)
1145                         pa_dbf.free_result(pa_db, result);
1146                 return -1;
1147         }
1148         if(result== NULL)
1149                 return -1;
1150
1151         if(result && result->n <=0)
1152         {
1153                 LM_ERR("No matching subscription dialog found in database\n");
1154                 
1155                 pa_dbf.free_result(pa_db, result);
1156                 *reply_code= 481;
1157                 *reply_str= pu_481_rpl;
1158
1159                 return -1;
1160         }
1161
1162         row = &result->rows[0];
1163         row_vals = ROW_VALUES(row);
1164         remote_cseq= row_vals[remote_cseq_col].val.int_val;
1165         
1166         if(subs->remote_cseq<= remote_cseq)
1167         {
1168                 LM_ERR("wrong sequence number received: %d - stored: %d\n",
1169                                 subs->remote_cseq, remote_cseq);
1170                 *reply_code= 400;
1171                 *reply_str= pu_400_rpl;
1172                 pa_dbf.free_result(pa_db, result);
1173                 return -1;
1174         }
1175         
1176         subs->status= row_vals[status_col].val.int_val;
1177         reason.s= (char*)row_vals[reason_col].val.string_val;
1178         if(reason.s)
1179         {
1180                 reason.len= strlen(reason.s);
1181                 subs->reason.s= (char*)pkg_malloc(reason.len* sizeof(char));
1182                 if(subs->reason.s== NULL)
1183                 {
1184                         ERR_MEM(PKG_MEM_STR);
1185                 }
1186                 memcpy(subs->reason.s, reason.s, reason.len);
1187                 subs->reason.len= reason.len;
1188         }
1189
1190         subs->local_cseq= row_vals[local_cseq_col].val.int_val;
1191         
1192         pres_uri.s= (char*)row_vals[pres_uri_col].val.string_val;
1193         pres_uri.len= strlen(pres_uri.s);
1194         subs->pres_uri.s= (char*)pkg_malloc(pres_uri.len* sizeof(char));
1195         if(subs->pres_uri.s== NULL)
1196         {
1197                 if(subs->reason.s)
1198                         pkg_free(subs->reason.s);
1199                 ERR_MEM(PKG_MEM_STR);
1200         }
1201         memcpy(subs->pres_uri.s, pres_uri.s, pres_uri.len);
1202         subs->pres_uri.len= pres_uri.len;
1203
1204         record_route.s= (char*)row_vals[record_route_col].val.string_val;
1205         if(record_route.s)
1206         {
1207                 record_route.len= strlen(record_route.s);
1208                 subs->record_route.s= (char*)pkg_malloc(record_route.len*sizeof(char));
1209                 if(subs->record_route.s== NULL)
1210                 {
1211                         ERR_MEM(PKG_MEM_STR);
1212                 }
1213                 memcpy(subs->record_route.s, record_route.s, record_route.len);
1214                 subs->record_route.len= record_route.len;
1215         }
1216
1217
1218         pa_dbf.free_result(pa_db, result);
1219         result= NULL;
1220
1221         return 0;
1222 error:
1223         if(result)
1224                 pa_dbf.free_result(pa_db, result);
1225
1226         return -1;
1227
1228 }
1229
1230
1231 int handle_expired_subs(subs_t* s)
1232 {
1233         /* send Notify with state=terminated;reason=timeout */
1234         
1235         s->status= TERMINATED_STATUS;
1236         s->reason.s= "timeout";
1237         s->reason.len= 7;
1238         s->expires= 0;
1239
1240         if(send_notify_request(s, NULL, NULL, 0)< 0)
1241         {
1242                 LM_ERR("send Notify not successful\n");
1243                 return -1;
1244         }
1245         
1246         return 0;
1247
1248 }
1249
1250 void timer_db_update(unsigned int ticks,void *param)
1251 {       
1252         int no_lock;
1253
1254         if(ticks== 0 && param == NULL)
1255                 no_lock= 1;
1256
1257         if(pa_dbf.use_table(pa_db, active_watchers_table)< 0)
1258         {
1259                 LM_ERR("sql use table failed\n");
1260                 return;
1261         }
1262
1263         update_db_subs(pa_db, pa_dbf, subs_htable, 
1264                         shtable_size, no_lock, handle_expired_subs);
1265
1266 }
1267
1268 void update_db_subs(db_con_t *db,db_func_t dbf, shtable_t hash_table,
1269         int htable_size, int no_lock, handle_expired_func_t handle_expired_func)
1270 {       
1271         db_key_t query_cols[22], update_cols[7], result_cols[6];
1272         db_val_t query_vals[22], update_vals[7];
1273         db_op_t update_ops[1];
1274         subs_t* del_s;
1275
1276         int pres_uri_col, to_user_col, to_domain_col, from_user_col, from_domain_col,
1277                 callid_col, totag_col, fromtag_col, event_col,status_col, event_id_col, 
1278                 local_cseq_col, remote_cseq_col, expires_col, record_route_col, 
1279                 contact_col, local_contact_col, version_col,socket_info_col,reason_col;
1280         int u_expires_col, u_local_cseq_col, u_remote_cseq_col, u_version_col, 
1281                 u_reason_col, u_status_col; 
1282         int i;
1283         subs_t* s= NULL, *prev_s= NULL;
1284         int n_query_cols= 0, n_update_cols= 0;
1285         int n_query_update;
1286
1287         query_cols[pres_uri_col= n_query_cols] ="presentity_uri";
1288         query_vals[pres_uri_col].type = DB_STR;
1289         query_vals[pres_uri_col].nul = 0;
1290         n_query_cols++;
1291         
1292         query_cols[callid_col= n_query_cols] ="callid";
1293         query_vals[callid_col].type = DB_STR;
1294         query_vals[callid_col].nul = 0;
1295         n_query_cols++;
1296
1297         query_cols[totag_col= n_query_cols] ="to_tag";
1298         query_vals[totag_col].type = DB_STR;
1299         query_vals[totag_col].nul = 0;
1300         n_query_cols++;
1301
1302         query_cols[fromtag_col= n_query_cols] ="from_tag";
1303         query_vals[fromtag_col].type = DB_STR;
1304         query_vals[fromtag_col].nul = 0;
1305         n_query_cols++;
1306
1307         n_query_update= n_query_cols;
1308
1309         query_cols[to_user_col= n_query_cols] ="to_user";
1310         query_vals[to_user_col].type = DB_STR;
1311         query_vals[to_user_col].nul = 0;
1312         n_query_cols++;
1313
1314         query_cols[to_domain_col= n_query_cols] ="to_domain";
1315         query_vals[to_domain_col].type = DB_STR;
1316         query_vals[to_domain_col].nul = 0;
1317         n_query_cols++;
1318         
1319         query_cols[from_user_col= n_query_cols] ="watcher_username";
1320         query_vals[from_user_col].type = DB_STR;
1321         query_vals[from_user_col].nul = 0;
1322         n_query_cols++;
1323
1324         query_cols[from_domain_col= n_query_cols] ="watcher_domain";
1325         query_vals[from_domain_col].type = DB_STR;
1326         query_vals[from_domain_col].nul = 0;
1327         n_query_cols++;
1328
1329         query_cols[event_col= n_query_cols] ="event";
1330         query_vals[event_col].type = DB_STR;
1331         query_vals[event_col].nul = 0;
1332         n_query_cols++; 
1333
1334         query_cols[event_id_col= n_query_cols] ="event_id";
1335         query_vals[event_id_col].type = DB_STR;
1336         query_vals[event_id_col].nul = 0;
1337         n_query_cols++;
1338
1339         query_cols[local_cseq_col= n_query_cols]="local_cseq";
1340         query_vals[local_cseq_col].type = DB_INT;
1341         query_vals[local_cseq_col].nul = 0;
1342         n_query_cols++;
1343
1344         query_cols[remote_cseq_col= n_query_cols]="remote_cseq";
1345         query_vals[remote_cseq_col].type = DB_INT;
1346         query_vals[remote_cseq_col].nul = 0;
1347         n_query_cols++;
1348
1349         query_cols[expires_col= n_query_cols] ="expires";
1350         query_vals[expires_col].type = DB_INT;
1351         query_vals[expires_col].nul = 0;
1352         n_query_cols++;
1353
1354         query_cols[status_col= n_query_cols] ="status";
1355         query_vals[status_col].type = DB_INT;
1356         query_vals[status_col].nul = 0;
1357         n_query_cols++;
1358
1359         query_cols[reason_col= n_query_cols] ="reason";
1360         query_vals[reason_col].type = DB_STR;
1361         query_vals[reason_col].nul = 0;
1362         n_query_cols++;
1363
1364         query_cols[record_route_col= n_query_cols] ="record_route";
1365         query_vals[record_route_col].type = DB_STR;
1366         query_vals[record_route_col].nul = 0;
1367         n_query_cols++;
1368         
1369         query_cols[contact_col= n_query_cols] ="contact";
1370         query_vals[contact_col].type = DB_STR;
1371         query_vals[contact_col].nul = 0;
1372         n_query_cols++;
1373
1374         query_cols[local_contact_col= n_query_cols] ="local_contact";
1375         query_vals[local_contact_col].type = DB_STR;
1376         query_vals[local_contact_col].nul = 0;
1377         n_query_cols++;
1378
1379         query_cols[socket_info_col= n_query_cols] ="socket_info";
1380         query_vals[socket_info_col].type = DB_STR;
1381         query_vals[socket_info_col].nul = 0;
1382         n_query_cols++;
1383
1384         query_cols[version_col= n_query_cols]="version";
1385         query_vals[version_col].type = DB_INT;
1386         query_vals[version_col].nul = 0;
1387         n_query_cols++;
1388
1389         /* cols and values used for update */
1390         update_cols[u_expires_col= n_update_cols]= "expires";
1391         update_vals[u_expires_col].type = DB_INT;
1392         update_vals[u_expires_col].nul = 0;
1393         n_update_cols++;
1394
1395         update_cols[u_status_col= n_update_cols]= "status";
1396         update_vals[u_status_col].type = DB_INT;
1397         update_vals[u_status_col].nul = 0;
1398         n_update_cols++;
1399
1400         update_cols[u_reason_col= n_update_cols]= "reason";
1401         update_vals[u_reason_col].type = DB_STR;
1402         update_vals[u_reason_col].nul = 0;
1403         n_update_cols++;
1404
1405         update_cols[u_remote_cseq_col= n_update_cols]= "remote_cseq";
1406         update_vals[u_remote_cseq_col].type = DB_INT;
1407         update_vals[u_remote_cseq_col].nul = 0;
1408         n_update_cols++;
1409
1410         update_cols[u_local_cseq_col= n_update_cols]= "local_cseq";
1411         update_vals[u_local_cseq_col].type = DB_INT;
1412         update_vals[u_local_cseq_col].nul = 0;
1413         n_update_cols++;
1414         
1415         update_cols[u_version_col= n_update_cols]= "version";
1416         update_vals[u_version_col].type = DB_INT;
1417         update_vals[u_version_col].nul = 0;
1418         n_update_cols++;
1419
1420         result_cols[0]= "expires";
1421
1422         if(db== NULL)
1423         {
1424                 LM_ERR("null database connection\n");
1425                 return;
1426         }
1427         for(i=0; i<htable_size; i++) 
1428         {
1429                 if(!no_lock)
1430                         lock_get(&hash_table[i].lock);  
1431
1432                 prev_s= hash_table[i].entries;
1433                 s= prev_s->next;
1434         
1435                 while(s)
1436                 {
1437                         printf_subs(s);
1438                         if(s->expires < (int)time(NULL)- 50)    
1439                         {
1440                                 LM_DBG("Found expired record\n");
1441                                 if(!no_lock)
1442                                 {
1443                                         if(handle_expired_func(s)< 0)
1444                                         {
1445                                                 LM_ERR("in function handle_expired_record\n");
1446                                                 if(!no_lock)
1447                                                         lock_release(&hash_table[i].lock);      
1448                                                 return ;
1449                                         }
1450                                 }
1451                                 del_s= s;       
1452                                 s= s->next;
1453                                 prev_s->next= s;
1454                                 
1455                                 shm_free(del_s);
1456                                 continue;
1457                         }
1458                         switch(s->db_flag)
1459                         {
1460                                 case NO_UPDATEDB_FLAG:
1461                                 {
1462                                         LM_DBG("NO_UPDATEDB_FLAG\n");
1463                                         break;                    
1464                                 }
1465                                 case UPDATEDB_FLAG:
1466                                 {
1467                                         LM_DBG("UPDATEDB_FLAG\n");
1468
1469                                         query_vals[pres_uri_col].val.str_val= s->pres_uri;
1470                                         query_vals[callid_col].val.str_val= s->callid;
1471                                         query_vals[totag_col].val.str_val= s->to_tag;
1472                                         query_vals[fromtag_col].val.str_val= s->from_tag;
1473                                 
1474                                         update_vals[u_expires_col].val.int_val= s->expires;
1475                                         update_vals[u_local_cseq_col].val.int_val= s->local_cseq;
1476                                         update_vals[u_remote_cseq_col].val.int_val= s->remote_cseq;
1477                                         update_vals[u_version_col].val.int_val= s->version;
1478                                         update_vals[u_status_col].val.int_val= s->status;
1479                                         update_vals[u_reason_col].val.str_val= s->reason;
1480
1481                                         if(dbf.update(db, query_cols, 0, query_vals, update_cols, 
1482                                                                 update_vals, n_query_update, n_update_cols)< 0)
1483                                         {
1484                                                 LM_ERR("updating in database\n");
1485                                                 if(!no_lock)
1486                                                         lock_release(&hash_table[i].lock);      
1487                                                 return ;
1488                                         }
1489                                         break;
1490                                 }
1491                                 case  INSERTDB_FLAG:
1492                                 {
1493                                         LM_DBG("INSERTDB_FLAG\n");
1494
1495                                         query_vals[pres_uri_col].val.str_val= s->pres_uri;
1496                                         query_vals[callid_col].val.str_val= s->callid;
1497                                         query_vals[totag_col].val.str_val= s->to_tag;
1498                                         query_vals[fromtag_col].val.str_val= s->from_tag;
1499                                         query_vals[to_user_col].val.str_val = s->to_user;
1500                                         query_vals[to_domain_col].val.str_val = s->to_domain;
1501                                         query_vals[from_user_col].val.str_val = s->from_user;
1502                                         query_vals[from_domain_col].val.str_val = s->from_domain;
1503                                         query_vals[event_col].val.str_val = s->event->name;
1504                                         query_vals[event_id_col].val.str_val = s->event_id;
1505                                         query_vals[local_cseq_col].val.int_val= s->local_cseq;
1506                                         query_vals[remote_cseq_col].val.int_val= s->remote_cseq;
1507                                         query_vals[expires_col].val.int_val = s->expires;
1508                                         query_vals[record_route_col].val.str_val = s->record_route;
1509                                         query_vals[contact_col].val.str_val = s->contact;
1510                                         query_vals[local_contact_col].val.str_val = s->local_contact;
1511                                         query_vals[version_col].val.int_val= s->version;
1512                                         query_vals[status_col].val.int_val= s->status;
1513                                         query_vals[reason_col].val.str_val= s->reason;
1514                                         query_vals[socket_info_col].val.str_val= s->sockinfo_str;
1515                                 
1516                                         if(dbf.insert(db,query_cols,query_vals,n_query_cols )<0)
1517                                         {
1518                                                 LM_ERR("unsuccessful sql insert\n");
1519                                                 if(!no_lock)
1520                                                         lock_release(&hash_table[i].lock);
1521                                                 return ;
1522                                         }
1523                                         break;                                                                          
1524                                 }
1525
1526                         }
1527                         s->db_flag= NO_UPDATEDB_FLAG;   
1528                         prev_s= s;
1529                         s= s->next;
1530                 }
1531                 if(!no_lock)
1532                         lock_release(&hash_table[i].lock);      
1533         }
1534
1535         update_vals[0].val.int_val= (int)time(NULL)- 10;
1536         update_ops[0]= OP_LT;
1537         if(dbf.delete(db, update_cols, update_ops, update_vals, 1) < 0)
1538         {
1539                 LM_ERR("deleting expired information from database\n");
1540         }
1541
1542 }
1543
1544 int restore_db_subs(void)
1545 {
1546         db_key_t result_cols[22]; 
1547         db_res_t *res= NULL;
1548         db_row_t *row = NULL;   
1549         db_val_t *row_vals= NULL;
1550         int i;
1551         int n_result_cols= 0;
1552         int pres_uri_col, expires_col, from_user_col, from_domain_col,to_user_col; 
1553         int callid_col,totag_col,fromtag_col,to_domain_col,sockinfo_col,reason_col;
1554         int event_col,contact_col,record_route_col, event_id_col, status_col;
1555         int remote_cseq_col, local_cseq_col, local_contact_col, version_col;
1556         subs_t s;
1557         str ev_sname;
1558         pres_ev_t* event= NULL;
1559         event_t parsed_event;
1560         unsigned int expires;
1561         unsigned int hash_code;
1562
1563         result_cols[pres_uri_col=n_result_cols++]       ="presentity_uri";              
1564         result_cols[expires_col=n_result_cols++]="expires";
1565         result_cols[event_col=n_result_cols++]  ="event";
1566         result_cols[event_id_col=n_result_cols++]="event_id";
1567         result_cols[to_user_col=n_result_cols++]        ="to_user";
1568         result_cols[to_domain_col=n_result_cols++]      ="to_domain";
1569         result_cols[from_user_col=n_result_cols++]      ="watcher_username";
1570         result_cols[from_domain_col=n_result_cols++]="watcher_domain";
1571         result_cols[callid_col=n_result_cols++] ="callid";
1572         result_cols[totag_col=n_result_cols++]  ="to_tag";
1573         result_cols[fromtag_col=n_result_cols++]="from_tag";
1574         result_cols[local_cseq_col= n_result_cols++]    ="local_cseq";
1575         result_cols[remote_cseq_col= n_result_cols++]   ="remote_cseq";
1576         result_cols[record_route_col= n_result_cols++]  ="record_route";
1577         result_cols[sockinfo_col= n_result_cols++]      ="socket_info";
1578         result_cols[contact_col= n_result_cols++]       ="contact";
1579         result_cols[local_contact_col= n_result_cols++] ="local_contact";
1580         result_cols[version_col= n_result_cols++]       ="version";
1581         result_cols[status_col= n_result_cols++]        ="status";
1582         result_cols[reason_col= n_result_cols++]        ="reason";
1583         
1584         if(!pa_db)
1585         {
1586                 LM_ERR("null database connection\n");
1587                 return -1;
1588         }
1589         if(pa_dbf.use_table(pa_db, active_watchers_table)< 0)
1590         {
1591                 LM_ERR("in use table\n");
1592                 return -1;
1593         }
1594
1595         if(pa_dbf.query(pa_db,0, 0, 0, result_cols,0, n_result_cols, 0,&res)< 0)
1596         {
1597                 LM_ERR("while querrying table\n");
1598                 if(res)
1599                 {
1600                         pa_dbf.free_result(pa_db, res);
1601                         res = NULL;
1602                 }
1603                 return -1;
1604         }
1605         if(res== NULL)
1606                 return -1;
1607
1608         if(res->n<=0)
1609         {
1610                 LM_INFO("The query returned no result\n");
1611                 pa_dbf.free_result(pa_db, res);
1612                 res = NULL;
1613                 return 0;
1614         }
1615
1616         LM_DBG("found %d db entries\n", res->n);
1617
1618         for(i =0 ; i< res->n ; i++)
1619         {
1620                 row = &res->rows[i];
1621                 row_vals = ROW_VALUES(row);
1622                 memset(&s, 0, sizeof(subs_t));
1623
1624                 expires= row_vals[expires_col].val.int_val;
1625                 
1626                 if(expires< (int)time(NULL))
1627                         continue;
1628         
1629                 s.pres_uri.s= (char*)row_vals[pres_uri_col].val.string_val;
1630                 s.pres_uri.len= strlen(s.pres_uri.s);
1631                 
1632                 s.to_user.s=(char*)row_vals[to_user_col].val.string_val;
1633                 s.to_user.len= strlen(s.to_user.s);
1634
1635                 s.to_domain.s=(char*)row_vals[to_domain_col].val.string_val;
1636                 s.to_domain.len= strlen(s.to_domain.s);
1637
1638                 s.from_user.s=(char*)row_vals[from_user_col].val.string_val;
1639                 s.from_user.len= strlen(s.from_user.s);
1640                 
1641                 s.from_domain.s=(char*)row_vals[from_domain_col].val.string_val;
1642                 s.from_domain.len= strlen(s.from_domain.s);
1643
1644                 s.to_tag.s=(char*)row_vals[totag_col].val.string_val;
1645                 s.to_tag.len= strlen(s.to_tag.s);
1646
1647                 s.from_tag.s=(char*)row_vals[fromtag_col].val.string_val;
1648                 s.from_tag.len= strlen(s.from_tag.s);
1649
1650                 s.callid.s=(char*)row_vals[callid_col].val.string_val;
1651                 s.callid.len= strlen(s.callid.s);
1652
1653                 ev_sname.s= (char*)row_vals[event_col].val.string_val;
1654                 ev_sname.len= strlen(ev_sname.s);
1655                 
1656                 event= contains_event(&ev_sname, &parsed_event);
1657                 if(event== NULL)
1658                 {
1659                         LM_DBG("insert a new event structure in the list waiting"
1660                                         " to be filled in\n");
1661         
1662                         /*insert a new event structure in the list waiting to be filled in*/
1663                         event= (pres_ev_t*)shm_malloc(sizeof(pres_ev_t));
1664                         if(event== NULL)
1665                         {
1666                                 ERR_MEM(SHM_MEM_STR);
1667                         }
1668                         memset(event, 0, sizeof(pres_ev_t));
1669                         event->name.s= (char*)shm_malloc(ev_sname.len* sizeof(char));
1670                         if(event->name.s== NULL)
1671                         {
1672                                 ERR_MEM(SHM_MEM_STR);
1673                         }
1674                         memcpy(event->name.s,ev_sname.s, ev_sname.len);
1675                         event->name.len= ev_sname.len;
1676                         
1677                         event->evp= shm_copy_event(&parsed_event);
1678                         if(event->evp== NULL)
1679                         {
1680                                 LM_ERR("ERROR copying event_t structure\n");
1681                                 goto error;
1682                         }
1683                         event->next= EvList->events;
1684                         EvList->events= event;
1685                 }
1686                 s.event= event;
1687
1688                 s.event_id.s=(char*)row_vals[event_id_col].val.string_val;
1689                 if(s.event_id.s)
1690                         s.event_id.len= strlen(s.event_id.s);
1691
1692                 s.remote_cseq= row_vals[remote_cseq_col].val.int_val;
1693                 s.local_cseq= row_vals[local_cseq_col].val.int_val;
1694                 s.version= row_vals[version_col].val.int_val;
1695                 
1696                 s.expires= expires- (int)time(NULL);
1697                 s.status= row_vals[status_col].val.int_val;
1698
1699                 s.reason.s= (char*)row_vals[reason_col].val.string_val;
1700                 if(s.reason.s)
1701                         s.reason.len= strlen(s.reason.s);
1702
1703                 s.contact.s=(char*)row_vals[contact_col].val.string_val;
1704                 s.contact.len= strlen(s.contact.s);
1705
1706                 s.local_contact.s=(char*)row_vals[local_contact_col].val.string_val;
1707                 s.local_contact.len= strlen(s.local_contact.s);
1708         
1709                 s.record_route.s=(char*)row_vals[record_route_col].val.string_val;
1710                 if(s.record_route.s)
1711                         s.record_route.len= strlen(s.record_route.s);
1712         
1713                 s.sockinfo_str.s=(char*)row_vals[sockinfo_col].val.string_val;
1714                 s.sockinfo_str.len= strlen(s.sockinfo_str.s);
1715
1716                 hash_code= core_hash(&s.pres_uri, &s.event->name, shtable_size);
1717                 if(insert_shtable(subs_htable, hash_code, &s)< 0)
1718                 {
1719                         LM_ERR("adding new record in hash table\n");
1720                         goto error;
1721                 }
1722         }
1723
1724         pa_dbf.free_result(pa_db, res);
1725
1726         /* delete all records */
1727         if(pa_dbf.delete(pa_db, 0,0,0,0)< 0)
1728         {
1729                 LM_ERR("deleting all records from database table\n");
1730                 return -1;
1731         }
1732
1733         return 0;
1734
1735 error:
1736         if(res)
1737                 pa_dbf.free_result(pa_db, res);
1738         return -1;
1739
1740 }
1741
1742 int refresh_watcher(str* pres_uri, str* watcher_uri, str* event, 
1743                 int status, str* reason)
1744 {
1745         unsigned int hash_code;
1746         subs_t* s, *s_copy;
1747         pres_ev_t* ev;          
1748         struct sip_uri uri;
1749         str user, domain;
1750         /* refresh status in subs_htable and send notify */
1751
1752         ev=     contains_event(event, NULL);
1753         if(ev== NULL)
1754         {
1755                 LM_ERR("while searching event in list\n");
1756                 return -1;
1757         }
1758
1759         if(parse_uri(watcher_uri->s, watcher_uri->len, &uri)< 0)
1760         {
1761                 LM_ERR("parsing uri\n");
1762                 return -1;
1763         }
1764         user= uri.user;
1765         domain= uri.host;
1766
1767         hash_code= core_hash(pres_uri, event, shtable_size);
1768
1769         lock_get(&subs_htable[hash_code].lock);
1770
1771         s= subs_htable[hash_code].entries->next;
1772
1773         while(s)
1774         {
1775                 if(s->event== ev && s->pres_uri.len== pres_uri->len &&
1776                         strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0 &&
1777                         s->from_user.len==user.len && strncmp(s->from_user.s,user.s, user.len)==0 &&
1778                         s->from_domain.len== domain.len && 
1779                         strncmp(s->from_domain.s, domain.s, domain.len)== 0)
1780                 {
1781                         s->status= status;
1782                         if(reason)
1783                                 s->reason= *reason;
1784                         
1785                         s_copy= mem_copy_subs(s, PKG_MEM_TYPE);
1786                         if(s_copy== NULL)
1787                         {
1788                                 LM_ERR("copying subs_t\n");
1789                                 lock_release(&subs_htable[hash_code].lock);
1790                                 return -1;
1791                         }
1792                         lock_release(&subs_htable[hash_code].lock);
1793                         if(notify(s_copy, NULL, NULL, 0)< 0)
1794                         {
1795                                 LM_ERR("in notify function\n");
1796                                 pkg_free(s_copy);
1797                                 return -1;
1798                         }
1799                         pkg_free(s_copy);
1800                         lock_get(&subs_htable[hash_code].lock);
1801                 }
1802                 s= s->next;
1803         }
1804         return 0;
1805 }
1806
1807 int get_db_subs_auth(subs_t* subs, int* found)
1808 {
1809         db_key_t db_keys[5];
1810         db_val_t db_vals[5];
1811         int n_query_cols= 0; 
1812         db_key_t result_cols[3];
1813         db_res_t *result = NULL;
1814         db_row_t *row ; 
1815         db_val_t *row_vals ;
1816
1817         db_keys[n_query_cols] ="presentity_uri";
1818         db_vals[n_query_cols].type = DB_STR;
1819         db_vals[n_query_cols].nul = 0;
1820         db_vals[n_query_cols].val.str_val= subs->pres_uri;
1821         n_query_cols++;
1822
1823         db_keys[n_query_cols] ="watcher_username";
1824         db_vals[n_query_cols].type = DB_STR;
1825         db_vals[n_query_cols].nul = 0;
1826         db_vals[n_query_cols].val.str_val = subs->from_user;
1827         n_query_cols++;
1828
1829         db_keys[n_query_cols] ="watcher_domain";
1830         db_vals[n_query_cols].type = DB_STR;
1831         db_vals[n_query_cols].nul = 0;
1832         db_vals[n_query_cols].val.str_val = subs->from_domain;
1833         n_query_cols++;
1834         
1835         db_keys[n_query_cols] ="event";
1836         db_vals[n_query_cols].type = DB_STR;
1837         db_vals[n_query_cols].nul = 0;
1838         db_vals[n_query_cols].val.str_val = subs->event->name;
1839         n_query_cols++;
1840
1841         result_cols[0] = "status";
1842         result_cols[1] = "reason";
1843
1844         if(pa_dbf.use_table(pa_db, watchers_table)< 0)
1845         {
1846                 LM_ERR("in use table\n");
1847                 return -1;
1848         }       
1849
1850         if(pa_dbf.query(pa_db, db_keys, 0, db_vals, result_cols,
1851                                         n_query_cols, 2, 0, &result )< 0)
1852         {
1853                 LM_ERR("while querying watchers table\n");
1854                 if(result)
1855                         pa_dbf.free_result(pa_db, result);
1856                 return -1;
1857         }
1858         if(result== NULL)
1859                 return -1;
1860         
1861         if(result->n<= 0)
1862         {
1863                 *found= 0;
1864                 pa_dbf.free_result(pa_db, result);
1865                 return 0;
1866         }
1867
1868         *found= 1;
1869         row = &result->rows[0];
1870         row_vals = ROW_VALUES(row);
1871         subs->status= row_vals[0].val.int_val;
1872
1873         if(row_vals[1].val.string_val)
1874         {
1875
1876                 subs->reason.len= strlen(row_vals[1].val.string_val);
1877                 if(subs->reason.len== 0)
1878                         subs->reason.s= NULL;
1879                 else
1880                 {
1881                         subs->reason.s= (char*)pkg_malloc(subs->reason.len*sizeof(char));
1882                         if(subs->reason.s== NULL)
1883                         {
1884                                 pa_dbf.free_result(pa_db, result);
1885                                 ERR_MEM(PKG_MEM_STR);
1886                         }               
1887                         memcpy(subs->reason.s, row_vals[1].val.string_val, subs->reason.len);
1888                 }
1889         }
1890         
1891         pa_dbf.free_result(pa_db, result);
1892         return 0;
1893 error:
1894         return -1;
1895 }       
1896
1897 int insert_db_subs_auth(subs_t* subs)
1898 {
1899         db_key_t db_keys[10];
1900         db_val_t db_vals[10];
1901         int n_query_cols= 0; 
1902
1903         db_keys[n_query_cols] ="presentity_uri";
1904         db_vals[n_query_cols].type = DB_STR;
1905         db_vals[n_query_cols].nul = 0;
1906         db_vals[n_query_cols].val.str_val= subs->pres_uri;
1907         n_query_cols++;
1908
1909         db_keys[n_query_cols] ="watcher_username";
1910         db_vals[n_query_cols].type = DB_STR;
1911         db_vals[n_query_cols].nul = 0;
1912         db_vals[n_query_cols].val.str_val = subs->from_user;
1913         n_query_cols++;
1914
1915         db_keys[n_query_cols] ="watcher_domain";
1916         db_vals[n_query_cols].type = DB_STR;
1917         db_vals[n_query_cols].nul = 0;
1918         db_vals[n_query_cols].val.str_val = subs->from_domain;
1919         n_query_cols++;
1920         
1921         db_keys[n_query_cols] ="event";
1922         db_vals[n_query_cols].type = DB_STR;
1923         db_vals[n_query_cols].nul = 0;
1924         db_vals[n_query_cols].val.str_val = subs->event->name;
1925         n_query_cols++;
1926
1927         db_keys[n_query_cols] ="status";
1928         db_vals[n_query_cols].type = DB_INT;
1929         db_vals[n_query_cols].nul = 0;
1930         db_vals[n_query_cols].val.int_val = subs->status;
1931         n_query_cols++;
1932                                                                 
1933         db_keys[n_query_cols] = "inserted_time";
1934         db_vals[n_query_cols].type = DB_INT;
1935         db_vals[n_query_cols].nul = 0;
1936         db_vals[n_query_cols].val.int_val= (int)time(NULL);
1937         n_query_cols++;
1938         
1939         if(subs->reason.s && subs->reason.len)
1940         {
1941                 db_keys[n_query_cols] ="reason";
1942                 db_vals[n_query_cols].type = DB_STR;
1943                 db_vals[n_query_cols].nul = 0;
1944                 db_vals[n_query_cols].val.str_val = subs->reason;
1945                 n_query_cols++; 
1946         }       
1947         
1948         if (pa_dbf.use_table(pa_db, watchers_table) < 0) 
1949         {
1950                 LM_ERR("in use_table\n");
1951                 return -1;
1952         }
1953
1954         if(pa_dbf.insert(pa_db, db_keys, db_vals, n_query_cols )< 0)
1955         {       
1956                 LM_ERR("in sql insert\n");
1957                 return -1;
1958         }
1959
1960         return 0;
1961 }