all: updated FSF address in GPL text
[sip-router] / modules / rls / resource_notify.c
1 /*
2  * $Id: resource_notify.c 2230 2007-06-06 07:13:20Z anca_vamanu $
3  *
4  * rls module - resource list server
5  *
6  * Copyright (C) 2007 Voice Sistem S.R.L.
7  *
8  * This file is part of Kamailio, a free SIP server.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License 
21  * along with this program; if not, write to the Free Software 
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23  *
24  * History:
25  * --------
26  *  2007-09-11  initial version (anca)
27  */
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <libxml/parser.h>
32 #include "../../parser/parse_content.h"
33 #include "../../parser/parse_from.h"
34 #include "../../lib/kcore/cmpapi.h"
35 #include "../../hashes.h"
36 #include "../../trim.h"
37 #include "../pua/hash.h"
38 #include "rls.h"
39 #include "notify.h"
40 #include "resource_notify.h"
41
42 /* how to relate resource oriented dialogs to list_uri */
43 /* sol1: use the same callid in Subscribe requests 
44  * sol2: include an extra header
45  * sol3: put the list_uri as the id of the record stored in
46  * pua and write a function to return that id
47  * winner: sol3
48  * */
49 static str su_200_rpl     = str_init("OK");
50 static str pu_400_rpl     = str_init("Bad Request");
51 static str pu_481_rpl     = str_init("Call/Transaction Does Not Exist");
52 static str pu_489_rpl     = str_init("Bad Event");
53 static str pu_500_rpl     = str_init("Server Internal Error");
54
55 int subset = 0;
56
57 int parse_rlsubs_did(char* str_did, str* callid, str* from_tag, str* to_tag)
58 {
59         char* smc= NULL;
60
61         smc= strstr(str_did, RLS_DID_SEP);
62     if(smc== NULL)
63     {
64         LM_ERR("bad format for resource list Subscribe dialog"
65             " indentifier[rlsubs did]= %s\n", str_did);
66         return -1;
67     }
68         callid->s= str_did;
69         callid->len= smc- str_did;
70                         
71         from_tag->s= smc+ RLS_DID_SEP_LEN;
72         smc= strstr(from_tag->s, RLS_DID_SEP);
73         if(smc== NULL)
74     {
75         LM_ERR("bad format for resource list Subscribe dialog"
76             " indentifier(rlsubs did)= %s\n", str_did);
77         return -1;
78     }
79         from_tag->len= smc- from_tag->s;
80                 
81         to_tag->s= smc+ RLS_DID_SEP_LEN;
82         to_tag->len= strlen(str_did)- 2* RLS_DID_SEP_LEN- callid->len- from_tag->len;
83
84         return 0;
85 }
86
87
88 void get_dialog_from_did(char* did, subs_t **dialog, unsigned int *hash_code)
89 {
90         str callid, to_tag, from_tag;
91         subs_t* s;
92
93         *dialog= NULL;
94
95         /* search the subscription in rlsubs_table*/            
96         if( parse_rlsubs_did(did, &callid, &from_tag, &to_tag)< 0)
97         {
98                 LM_ERR("bad format for resource list Subscribe dialog "
99                         "indentifier(rlsubs did)\n");
100                 return;
101         }
102
103         if (dbmode == RLS_DB_ONLY)
104         {
105                 *dialog = get_dialog_notify_rlsdb(callid,to_tag,from_tag);
106
107                 if(*dialog==NULL)
108                 {
109                         LM_INFO("record not retrieved from db [rlsubs_did]= %s\n", did);
110                         return;
111                 }
112         }
113         else
114         {
115                 *hash_code= core_hash(&callid, &to_tag, hash_size);
116
117                 lock_get(&rls_table[*hash_code].lock);
118                 s= pres_search_shtable(rls_table,callid,to_tag,from_tag,*hash_code);
119
120                 if(s== NULL)
121                 {
122                         LM_INFO("record not found in hash_table [rlsubs_did]= %s\n",
123                                         did);
124                         lock_release(&rls_table[*hash_code].lock);
125                         return;
126                 }
127
128                 /* save dialog info */
129                 *dialog= pres_copy_subs(s, PKG_MEM_TYPE);
130                 if(*dialog== NULL)
131                 {
132                         LM_ERR("while copying subs_t structure\n");
133                         lock_release(&rls_table[*hash_code].lock);
134                         return;
135                 }
136         }
137
138         if ((*dialog)->expires < (int)time(NULL))
139                 (*dialog)->expires = 0;
140         else
141                 (*dialog)->expires -= (int)time(NULL);
142
143         if (dbmode != RLS_DB_ONLY)
144                 lock_release(&rls_table[*hash_code].lock);
145
146 }
147
148 int send_notify(xmlDocPtr * rlmi_doc, char * buf, int buf_len, 
149                  const str bstr, subs_t * dialog, unsigned int hash_code)
150 {
151     int result = 0;
152     str rlmi_cont= {0, 0}, multi_cont;
153
154     xmlDocDumpFormatMemory(*rlmi_doc,(xmlChar**)(void*)&rlmi_cont.s,
155                                 &rlmi_cont.len, 0);
156                 
157     multi_cont.s= buf;
158     multi_cont.len= buf_len;
159
160     result =agg_body_sendn_update(&dialog->pres_uri, bstr.s, &rlmi_cont, 
161                  (buf_len==0)?NULL:&multi_cont, dialog, hash_code);
162     xmlFree(rlmi_cont.s);
163     xmlFreeDoc(*rlmi_doc);
164     *rlmi_doc= NULL;
165     return result;
166 }
167
168
169 static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col, int auth_state_col, int reason_col,
170                    int pres_state_col, int content_type_col)
171 {
172         int i;
173         char* prev_did= NULL, * curr_did= NULL;
174         db_row_t *row;  
175         db_val_t *row_vals;
176         char* resource_uri;
177         str pres_state = {0, 0};
178         xmlDocPtr rlmi_doc= NULL;
179         xmlNodePtr list_node= NULL, instance_node= NULL, resource_node;
180         unsigned int hash_code= 0;
181         int size= BUF_REALLOC_SIZE, buf_len= 0; 
182         char* buf= NULL, *auth_state= NULL, *boundary_string= NULL;
183         str cid = {0,0};
184         str content_type= {0, 0};
185         int auth_state_flag;
186         int chunk_len=0;
187         str bstr= {0, 0};
188         subs_t* dialog= NULL;
189         int len_est = 0;
190         int resource_added = 0; /* Flag to indicate that we have added at least one resource */
191
192         /* generate the boundary string */
193         boundary_string= generate_string(BOUNDARY_STRING_LEN);
194         bstr.len= strlen(boundary_string);
195         bstr.s= (char*)pkg_malloc((bstr.len+ 1)* sizeof(char));
196         if(bstr.s== NULL)
197         {
198                 ERR_MEM(PKG_MEM_STR);
199         }
200         memcpy(bstr.s, boundary_string, bstr.len);
201         bstr.s[bstr.len]= '\0';
202
203         /* Allocate an initial buffer for the multipart body.
204          * This buffer will be reallocated if neccessary */
205         buf= pkg_malloc(size* sizeof(char));
206         if(buf== NULL)
207         {
208                 ERR_MEM(PKG_MEM_STR);
209         }
210
211         if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
212         {
213                 if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
214                 {
215                         LM_ERR("in start_transaction\n");
216                         goto error;
217                 }
218         }
219
220         LM_DBG("found %d records with updated state\n", result->n);
221         for(i= 0; i< result->n; i++)
222         {
223                 row = &result->rows[i];
224                 row_vals = ROW_VALUES(row);
225                 
226                 curr_did=     (char*)row_vals[did_col].val.string_val;
227                 resource_uri= (char*)row_vals[resource_uri_col].val.string_val;
228                 auth_state_flag=     row_vals[auth_state_col].val.int_val;
229                 pres_state.s=   (char*)row_vals[pres_state_col].val.string_val;
230                 pres_state.len = strlen(pres_state.s);
231                 trim(&pres_state);
232                 
233                 /* If we have moved onto a new resource list Subscribe dialog indentifier, 
234                    send a NOTIFY for the previous ID and then drop the existing documents. */
235                 if(prev_did!= NULL && strcmp(prev_did, curr_did)) 
236                 {
237                         if (send_notify(&rlmi_doc, buf, buf_len, bstr, dialog, hash_code))
238                         {  
239                                 LM_ERR("in send_notify\n");
240                                 goto error;
241                         }
242                         len_est = 0;
243                         pkg_free(dialog);
244                         dialog= NULL;
245                 }
246
247                 /*if first or different*/
248                 if(prev_did==NULL || strcmp(prev_did, curr_did)!=0)
249                 {
250                         /* Work out a subscription from the did. */
251                         get_dialog_from_did(curr_did, &dialog, &hash_code);
252                         if(dialog== NULL)
253                         {
254                                 prev_did = NULL;
255                                 LM_INFO("Dialog is NULL\n");
256                                 continue;
257                         }
258                 
259                         len_est = create_empty_rlmi_doc(&rlmi_doc, &list_node, &dialog->pres_uri, dialog->version, 0);
260                         len_est += 2*strlen(boundary_string)+4+102+2+50+strlen(resource_uri)+20;
261                         buf_len= 0;
262                         resource_added = 0;
263
264                         /* !!!! for now I will include the auth state without checking if 
265                          * it has changed - > in future chech if it works */            
266                 }
267
268                 /* add a node in rlmi_doc and if any presence state registered add 
269                  * it in the buffer */
270                 
271                 resource_node= xmlNewChild(list_node,NULL,BAD_CAST "resource", NULL);
272                 if(resource_node== NULL)
273                 {
274                         LM_ERR("when adding resource child\n");
275                         goto done;
276                 }
277                 xmlNewProp(resource_node, BAD_CAST "uri", BAD_CAST resource_uri);
278                 len_est += strlen (resource_uri) + 35; /* <resource uri="[uri]"></resource>/r/n */
279                 resource_added = 1;
280
281                 /* there might be more records with the same uri- more instances-
282                  * search and add them all */
283                 
284                 while(1)
285                 {
286                         cid.s= NULL;
287                         cid.len= 0;
288                         
289                         auth_state= get_auth_string(auth_state_flag);
290                         if(auth_state== NULL)
291                         {
292                                 LM_ERR("bad authorization status flag\n");
293                                 goto error;
294                         }       
295                         len_est += strlen(auth_state) + 38; /* <instance id="12345678" state="[auth_state]" />r/n */
296
297                         if(auth_state_flag & ACTIVE_STATE)
298                         {
299                                 cid.s= generate_cid(resource_uri, strlen(resource_uri));
300                                 cid.len = strlen(cid.s);
301                                 len_est += cid.len + 8; /* cid="[cid]" */
302                                 content_type.s = (char*)row_vals[content_type_col].val.string_val;
303                                 content_type.len = strlen(content_type.s);
304                                 chunk_len = 4 + bstr.len
305                                                         + 35
306                                                         + 16 + cid.len
307                                                         + 18 + content_type.len
308                                                         + 4 + pres_state.len + 8;
309                                 len_est += chunk_len;
310                         }
311                         else
312                         if(auth_state_flag & TERMINATED_STATE)
313                         {
314                                 len_est += strlen(row_vals[resource_uri_col].val.string_val) + 10; /* reason="[resaon]" */
315                         }
316             
317                         if (rls_max_notify_body_len > 0 && len_est > rls_max_notify_body_len)
318                         {
319                                 /* We have a limit on body length set, and we were about to exceed it */
320                                 if (resource_added == 1)
321                                 {
322                                         /* We added at least one resource. */
323                                         LM_DBG("timer_send_notify hit the size limit. len_est = %d\n", len_est);
324                                         if (send_notify(&rlmi_doc, buf, buf_len, bstr, dialog, hash_code))
325                                         {
326                                                 LM_ERR("in send_notify\n");
327                                                 goto error;
328                                         }
329                                         i --;
330                                 }
331                                 else
332                                 {
333                                         LM_DBG("timer_send_notify hit the size limit. NO RESOURCE ADDED len_est = %d\n", len_est);
334                                 }
335                                 len_est = 0;
336
337                                 pkg_free(dialog);
338                                 dialog= NULL;
339                                 curr_did=NULL;
340                                 break;
341                         }
342
343                         /* OK, we are happy this will fit */
344                         instance_node= xmlNewChild(resource_node, NULL, BAD_CAST "instance", NULL);
345                         if(instance_node== NULL)
346                         {
347                                 LM_ERR("while adding instance child\n");
348                                 goto error;
349                         }       
350
351                         /* Instance ID should be unique for each instance node
352                            within a resource node.  The same instance ID can be
353                            used in different resource nodes.  Instance ID needs
354                            to remain the same for each resource instance in
355                            future updates.  We can just use a common string
356                            here because you will only get multiple instances
357                            for a resource when the back-end SUBSCRIBE is forked
358                            and pua does not support this.  If/when pua supports
359                            forking of the SUBSCRIBEs it sends this will need to
360                            be fixed properly. */
361                         xmlNewProp(instance_node, BAD_CAST "id", 
362                                         BAD_CAST instance_id);
363                         if(auth_state_flag & ACTIVE_STATE)
364                         {
365                                 xmlNewProp(instance_node, BAD_CAST "state", BAD_CAST auth_state);
366                         }
367                         else
368                         if(auth_state_flag & TERMINATED_STATE)
369                         {
370                                 xmlNewProp(instance_node, BAD_CAST "reason",
371                                                 BAD_CAST row_vals[resource_uri_col].val.string_val);
372                         }
373                         xmlNewProp(instance_node, BAD_CAST "cid", BAD_CAST cid.s);
374
375                         /* add in the multipart buffer */
376                         if(cid.s)
377                         {
378         
379                                 while(buf_len + chunk_len >= size)
380                                 {
381                                         REALLOC_BUF
382                                 }
383                                 buf_len+= sprintf(buf+ buf_len, "--%.*s\r\n", bstr.len,
384                                                 bstr.s);
385                                 buf_len+= sprintf(buf+ buf_len,
386                                                 "Content-Transfer-Encoding: binary\r\n");
387                                 buf_len+= sprintf(buf+ buf_len, "Content-ID: <%.*s>\r\n",
388                                                 cid.len, cid.s);
389                                 buf_len+= sprintf(buf+ buf_len, "Content-Type: %.*s\r\n\r\n",
390                                                 content_type.len, content_type.s);
391                                 buf_len+= sprintf(buf+buf_len,"%.*s\r\n\r\n", pres_state.len,
392                                                 pres_state.s);
393                         }
394
395                         i++;
396                         if(i== result->n)
397                         {
398                                 i--;
399                                 break;
400                         }
401         
402                         row = &result->rows[i];
403                         row_vals = ROW_VALUES(row);
404
405                         if(strncmp(resource_uri, row_vals[resource_uri_col].val.string_val,
406                                         strlen(resource_uri))
407                                 || strncmp(curr_did, row_vals[did_col].val.string_val,
408                                         strlen(curr_did)))
409                         {
410                                 i--;
411                                 break;
412                         }
413                         resource_uri= (char*)row_vals[resource_uri_col].val.string_val;
414                         auth_state_flag=     row_vals[auth_state_col].val.int_val;
415                         pres_state.s=   (char*)row_vals[pres_state_col].val.string_val;
416                         pres_state.len= strlen(pres_state.s);
417                         trim(&pres_state);
418                 }
419
420                 prev_did= curr_did;
421         }
422
423         if(rlmi_doc)
424         {
425                 LM_DBG("timer_send_notify at end len_est = %d resource_added = %d\n", len_est, resource_added);
426                 if (resource_added == 1)
427                 {
428                         send_notify(&rlmi_doc, buf, buf_len, bstr, dialog, hash_code);
429                 }
430                 if(dialog)
431                 {
432                         pkg_free(dialog);
433                 }
434                 dialog= NULL;
435         }
436
437 done:
438         if (dbmode == RLS_DB_ONLY && rls_dbf.end_transaction)
439         {
440                 if (rls_dbf.end_transaction(rls_db) < 0)
441                 {
442                         LM_ERR("in end_transaction\n");
443                         goto error;
444                 }
445         }
446
447 error:
448         if(bstr.s)
449                 pkg_free(bstr.s);
450
451         if(buf)
452                 pkg_free(buf);
453         if(dialog)
454                 pkg_free(dialog);
455
456         if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction)
457         {
458                 if (rls_dbf.abort_transaction(rls_db) < 0)
459                         LM_ERR("in abort_transaction\n");
460         }
461
462         return;
463 }
464
465
466 int parse_subs_state(str auth_state, str *reason, int *expires)
467 {
468         str str_exp;
469         char* smc= NULL;
470         int len, flag= -1;
471
472         if (strncmp(auth_state.s, "active", 6)== 0)
473                 flag= ACTIVE_STATE;
474
475         if (strncmp(auth_state.s, "pending", 7)== 0)
476                 flag= PENDING_STATE; 
477
478         if (strncmp(auth_state.s, "terminated", 10)== 0)
479         {
480                 smc= strchr(auth_state.s, ';');
481                 if (smc== NULL)
482                 {
483                         LM_ERR("terminated state and no reason found");
484                         return -1;
485                 }
486                 if (strncmp(smc+1, "reason=", 7))
487                 {
488                         LM_ERR("terminated state and no reason found");
489                         return -1;
490                 }
491                 len=  auth_state.len- 10- 1- 7;
492                 reason->s = (char*) pkg_malloc(len* sizeof(char));
493                 if (reason->s== NULL)
494                 {
495                         ERR_MEM(PKG_MEM_STR);
496                 }
497                 memcpy(reason->s, smc+ 8, len);
498                 reason->len= len;
499                 return TERMINATED_STATE;
500         }
501         
502         if(flag> 0)
503         {
504                 smc= strchr(auth_state.s, ';');
505                 if(smc== NULL)
506                 {
507                         LM_ERR("active or pending state and no expires parameter found");
508                         return -1;
509                 }       
510                 if(strncmp(smc+1, "expires=", 8))
511                 {
512                         LM_ERR("active or pending state and no expires parameter found");
513                         return -1;
514                 }
515                 str_exp.s= smc+ 9;
516                 str_exp.len= auth_state.s+ auth_state.len- smc- 9;
517
518                 if( str2int(&str_exp, (unsigned int*)expires)< 0)
519                 {
520                         LM_ERR("while getting int from str\n");
521                         return -1;
522                 }
523                 return flag;
524         
525         }
526
527 error:
528         if (reason->s) pkg_free(reason->s);
529         return -1;
530 }
531
532 int rls_handle_notify(struct sip_msg* msg, char* c1, char* c2)
533 {
534         struct to_body *pto, TO = {0}, *pfrom = NULL;
535         str body= {0, 0};
536         ua_pres_t dialog;
537         str* res_id= NULL;
538         db_key_t query_cols[8];
539         db_val_t query_vals[8];
540         int n_query_cols= 0;
541         str auth_state= {0, 0};
542         int found= 0;
543         str reason = {0, 0};
544         int auth_flag;
545         struct hdr_field* hdr= NULL;
546         int expires= -1;
547         str content_type= {0, 0};
548         int reply_code = 500;
549         str reply_str = pu_500_rpl;
550
551         LM_DBG("start\n");
552         /* extract the dialog information and check if an existing dialog*/     
553         if( parse_headers(msg,HDR_EOH_F, 0)==-1 )
554         {
555                 LM_ERR("parsing headers\n");
556                 reply_code = 400;
557                 reply_str = pu_400_rpl;
558                 goto error;
559         }
560         if((!msg->event ) ||(msg->event->body.len<=0))
561         {
562                 LM_ERR("Missing event header field value\n");
563                 reply_code = 400;
564                 reply_str = pu_400_rpl;
565                 goto error;
566         }
567         if( msg->to==NULL || msg->to->body.s==NULL)
568         {
569                 LM_ERR("cannot parse TO header\n");
570                 reply_code = 400;
571                 reply_str = pu_400_rpl;
572                 goto error;
573         }
574         if(msg->to->parsed != NULL)
575         {
576                 pto = (struct to_body*)msg->to->parsed;
577                 LM_DBG("'To' header ALREADY PARSED: <%.*s>\n",
578                                 pto->uri.len, pto->uri.s );     
579         }
580         else
581         {
582                 parse_to(msg->to->body.s,msg->to->body.s + msg->to->body.len + 1, &TO);
583                 if(TO.uri.len <= 0) 
584                 {
585                         LM_ERR(" 'To' header NOT parsed\n");
586                         reply_code = 400;
587                         reply_str = pu_400_rpl;
588                         goto error;
589                 }
590                 pto = &TO;
591         }
592         memset(&dialog, 0, sizeof(ua_pres_t));
593         dialog.watcher_uri= &pto->uri;
594         if (pto->tag_value.s==NULL || pto->tag_value.len==0 )
595         {
596                 LM_ERR("to tag value not parsed\n");
597                 reply_code = 400;
598                 reply_str = pu_400_rpl;
599                 goto error;
600         }
601         dialog.from_tag= pto->tag_value;
602         if( msg->callid==NULL || msg->callid->body.s==NULL)
603         {
604                 LM_ERR("cannot parse callid header\n");
605                 reply_code = 400;
606                 reply_str = pu_400_rpl;
607                 goto error;
608         }
609         dialog.call_id = msg->callid->body;
610
611         if (!msg->from || !msg->from->body.s)
612         {
613                 LM_ERR("cannot find 'from' header!\n");
614                 reply_code = 400;
615                 reply_str = pu_400_rpl;
616                 goto error;
617         }
618         if (msg->from->parsed == NULL)
619         {
620                 LM_DBG("'From' header not parsed\n");
621                 /* parsing from header */
622                 if ( parse_from_header( msg )<0 ) 
623                 {
624                         LM_ERR("cannot parse From header\n");
625                         reply_code = 400;
626                         reply_str = pu_400_rpl;
627                         goto error;
628                 }
629         }
630         pfrom = (struct to_body*)msg->from->parsed;
631         dialog.pres_uri= &pfrom->uri;
632
633         if( pfrom->tag_value.s ==NULL || pfrom->tag_value.len == 0)
634         {
635                 LM_ERR("no from tag value present\n");
636                 reply_code = 400;
637                 reply_str = pu_400_rpl;
638                 goto error;
639         }
640         dialog.to_tag= pfrom->tag_value;
641         dialog.flag|= RLS_SUBSCRIBE;
642
643         dialog.event= get_event_flag(&msg->event->body);
644         if(dialog.event< 0)
645         {
646                 LM_ERR("unrecognized event package\n");
647                 reply_code = 489;
648                 reply_str = pu_489_rpl;
649                 goto error;
650         }
651
652         /* extract the subscription state */
653         hdr = msg->headers;
654         while (hdr!= NULL)
655         {
656                 if(cmp_hdrname_strzn(&hdr->name, "Subscription-State", 18)==0)  
657                 {
658                         found = 1;
659                         break;
660                 }
661                 hdr = hdr->next;
662         }
663         if(found==0 )
664         {
665                 LM_ERR("'Subscription-State' header not found\n");
666                 goto error;
667         }
668         auth_state = hdr->body;
669
670         /* extract state and reason */
671         auth_flag= parse_subs_state(auth_state, &reason, &expires);
672         if(auth_flag< 0)
673         {
674                 LM_ERR("while parsing 'Subscription-State' header\n");
675                 goto error;
676         }
677         if(pua_get_record_id(&dialog, &res_id)< 0) /* verify if within a stored dialog */
678         {
679                 LM_ERR("occured when trying to get record id\n");
680                 goto error;
681         }
682         if(res_id==0)
683         {
684                 LM_DBG("presence dialog record not found\n");
685                 /* if it is a NOTIFY for a terminated SUBSCRIBE dialog in RLS, then
686                  * the module might not have the dialog structure anymore
687                  * - just send 200ok, it is harmless
688                  */
689                 if(auth_flag==TERMINATED_STATE)
690                         goto done;
691                 LM_INFO("no presence dialog record for non-TERMINATED state uri pres_uri = %.*s watcher_uri = %.*s\n",
692                 dialog.pres_uri->len, dialog.pres_uri->s, dialog.watcher_uri->len, dialog.watcher_uri->s);
693                 reply_code = 481;
694                 reply_str = pu_481_rpl;
695                 goto error;
696         }
697                 
698         if(msg->content_type== NULL || msg->content_type->body.s== NULL)
699         {
700                 LM_DBG("cannot find content type header header\n");
701         }
702         else
703                 content_type= msg->content_type->body;
704                                         
705         /*constructing the xml body*/
706         if(get_content_length(msg) == 0 )
707         {       
708                 goto done;
709         }       
710         else
711         {
712                 if(content_type.s== 0)
713                 {
714                         LM_ERR("content length != 0 and no content type header found\n");
715                         goto error;
716                 }
717                 body.s=get_body(msg);
718                 if (body.s== NULL) 
719                 {
720                         LM_ERR("cannot extract body from msg\n");
721                         goto error;
722                 }
723                 body.len = get_content_length( msg );
724
725         }
726         /* update in rlpres_table where rlsusb_did= res_id and resource_uri= from_uri*/
727
728         LM_DBG("body= %.*s\n", body.len, body.s);
729
730         query_cols[n_query_cols]= &str_rlsubs_did_col;
731         query_vals[n_query_cols].type = DB1_STR;
732         query_vals[n_query_cols].nul = 0;
733         query_vals[n_query_cols].val.str_val= *res_id; 
734         n_query_cols++;
735
736         query_cols[n_query_cols]= &str_resource_uri_col;
737         query_vals[n_query_cols].type = DB1_STR;
738         query_vals[n_query_cols].nul = 0;
739         query_vals[n_query_cols].val.str_val= *dialog.pres_uri; 
740         n_query_cols++;
741
742         query_cols[n_query_cols]= &str_updated_col;
743         query_vals[n_query_cols].type = DB1_INT;
744         query_vals[n_query_cols].nul = 0;
745         if (dbmode == RLS_DB_ONLY)
746                 query_vals[n_query_cols].val.int_val=
747                         core_hash(res_id, NULL, 0) %
748                                 (waitn_time * rls_notifier_poll_rate
749                                         * rls_notifier_processes);
750         else
751                 query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
752         n_query_cols++;
753                 
754         query_cols[n_query_cols]= &str_auth_state_col;
755         query_vals[n_query_cols].type = DB1_INT;
756         query_vals[n_query_cols].nul = 0;
757         query_vals[n_query_cols].val.int_val= auth_flag; 
758         n_query_cols++;
759
760         query_cols[n_query_cols]= &str_reason_col;
761         query_vals[n_query_cols].type = DB1_STR;
762         query_vals[n_query_cols].nul = 0;
763         if(reason.len > 0)
764         {
765                 query_vals[n_query_cols].val.str_val.s= reason.s;
766                 query_vals[n_query_cols].val.str_val.len= reason.len;
767         }       
768         else
769         {
770                 query_vals[n_query_cols].val.str_val.s = "";
771                 query_vals[n_query_cols].val.str_val.len = 0;
772         }
773         n_query_cols++;
774
775         query_cols[n_query_cols]= &str_content_type_col;
776         query_vals[n_query_cols].type = DB1_STR;
777         query_vals[n_query_cols].nul = 0;
778         query_vals[n_query_cols].val.str_val= content_type;
779         n_query_cols++;
780                         
781         query_cols[n_query_cols]= &str_presence_state_col;
782         query_vals[n_query_cols].type = DB1_STR;
783         query_vals[n_query_cols].nul = 0;
784         query_vals[n_query_cols].val.str_val= body;
785         n_query_cols++;
786                 
787         query_cols[n_query_cols]= &str_expires_col;
788         query_vals[n_query_cols].type = DB1_INT;
789         query_vals[n_query_cols].nul = 0;
790         query_vals[n_query_cols].val.int_val= expires+ (int)time(NULL);
791         n_query_cols++;
792
793         if (rlpres_dbf.use_table(rlpres_db, &rlpres_table) < 0) 
794         {
795                 LM_ERR("in use_table\n");
796                 goto error;
797         }
798
799         if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
800         {
801                 if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
802                 {
803                         LM_ERR("in start_transaction\n");
804                         goto error;
805                 }
806         }
807
808         if (rlpres_dbf.replace != NULL)
809         {
810                 if(rlpres_dbf.replace(rlpres_db, query_cols, query_vals, n_query_cols,
811                                         2, 0)< 0)
812                 {
813                         LM_ERR("in sql replace\n");
814                         goto error;
815                 }
816                 LM_DBG("Inserted/replace in database table new record\n");
817         }
818         else
819         {
820                 if(rlpres_dbf.update(rlpres_db, query_cols, 0, query_vals, query_cols+2,
821                                                 query_vals+2, 2, n_query_cols-2)< 0)
822                 {
823                         LM_ERR("in sql update\n");
824                         goto error;
825                 }
826
827                 if (rlpres_dbf.affected_rows(rlpres_db) == 0)
828                 {
829                         if(rlpres_dbf.insert(rlpres_db, query_cols, query_vals, n_query_cols)< 0)
830                         {
831                                 LM_ERR("in sql insert\n");
832                                 goto error;
833                         }
834                         LM_DBG("Inserted in database table new record\n");
835                 }
836         }
837
838         if (dbmode == RLS_DB_ONLY && rlpres_dbf.end_transaction)
839         {
840                 if (rlpres_dbf.end_transaction(rlpres_db) < 0)
841                 {
842                         LM_ERR("in end_transaction\n");
843                         goto error;
844                 }
845         }
846
847         LM_DBG("Updated rlpres_table\n");       
848         /* reply 200OK */
849 done:
850         if(slb.freply(msg, 200, &su_200_rpl) < 0)
851         {
852                 LM_ERR("while sending reply\n");
853                 goto error;
854         }
855
856         if(res_id!=NULL)
857         {
858                 pkg_free(res_id->s);
859                 pkg_free(res_id);
860         }
861
862         if (reason.s) pkg_free(reason.s);
863
864         free_to_params(&TO);
865         return 1;
866
867 error:
868         if(slb.freply(msg, reply_code, &reply_str) < 0)
869         {
870                 LM_ERR("failed sending reply\n");
871         }
872         if(res_id!=NULL)
873         {
874                 pkg_free(res_id->s);
875                 pkg_free(res_id);
876         }
877
878         if (reason.s) pkg_free(reason.s);
879
880         free_to_params(&TO);
881
882         if (dbmode == RLS_DB_ONLY && rlpres_dbf.abort_transaction)
883         {
884                 if (rlpres_dbf.abort_transaction(rlpres_db) < 0)
885                         LM_ERR("in abort_transaction\n");
886         }
887
888         return -1;
889 }
890
891 #define EXTRACT_STRING(strng, chars)\
892                         do {\
893                         strng.s = (char *) chars;\
894                         strng.len = strlen(strng.s);\
895                         } while(0);
896
897 static void timer_send_full_state_notifies(int round)
898 {
899         db_key_t query_cols[1], result_cols[22], update_cols[1];
900         db_val_t query_vals[1], update_vals[1], *values;
901         db_row_t *rows;
902         db1_res_t *result = NULL;
903         int n_result_cols = 0, i;
904         int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
905         int wuser_col, wdomain_col, callid_col, to_tag_col, from_tag_col;
906         int sockinfo_col, lcontact_col, contact_col, rroute_col, event_id_col;
907         int reason_col, event_col, lcseq_col, rcseq_col, status_col;
908         int version_col, expires_col;
909         subs_t sub;
910         str ev_sname;
911         event_t parsed_event;
912         xmlDocPtr doc = NULL;
913         xmlNodePtr service_node = NULL;
914         int now = (int)time(NULL);
915         db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
916
917         query_cols[0] = &str_updated_col;
918         query_vals[0].type = DB1_INT;
919         query_vals[0].nul = 0;
920         query_vals[0].val.int_val = round;
921
922         result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
923         result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
924         result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
925         result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
926         result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
927         result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
928         result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
929         result_cols[callid_col = n_result_cols++] = &str_callid_col;
930         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
931         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
932         result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
933         result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
934         result_cols[contact_col = n_result_cols++] = &str_contact_col;
935         result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
936         result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
937         result_cols[reason_col = n_result_cols++] = &str_reason_col;
938         result_cols[event_col = n_result_cols++] = &str_event_col;
939         result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
940         result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
941         result_cols[status_col = n_result_cols++] = &str_status_col;
942         result_cols[version_col = n_result_cols++] = &str_version_col;
943         result_cols[expires_col = n_result_cols++] = &str_expires_col;
944
945         update_cols[0] = &str_updated_col;
946         update_vals[0].type = DB1_INT;
947         update_vals[0].nul = 0;
948         update_vals[0].val.int_val = NO_UPDATE_TYPE;
949
950         if (rls_dbf.use_table(rls_db, &rlsubs_table) < 0)
951         {
952                 LM_ERR("use table failed\n");
953                 goto done;
954         }
955
956         if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction)
957         {
958                 if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0)
959                 {
960                         LM_ERR("in start_transaction\n");
961                         goto done;
962                 }
963         }
964
965         /* Step 1: Find rls_watchers that require full-state notification */
966         if (query_fn(rls_db, query_cols, 0, query_vals, result_cols,
967                                 1, n_result_cols, 0, &result) < 0)
968         {
969                 LM_ERR("in sql query\n");
970                 goto done;
971         }
972         if(result== NULL || result->n<= 0)
973                 goto done;
974
975         /* Step 2: Reset the update flag so we do not full-state notify
976            these watchers again */
977         if(rls_dbf.update(rls_db, query_cols, 0, query_vals, update_cols,
978                                         update_vals, 1, 1)< 0)
979         {
980                 LM_ERR("in sql update\n");
981                 goto done;
982         }
983
984         if (dbmode == RLS_DB_ONLY && rls_dbf.end_transaction)
985         {
986                 if (rls_dbf.end_transaction(rls_db) < 0)
987                 {
988                         LM_ERR("in end_transaction\n");
989                         goto done;
990                 }
991         }
992
993         /* Step 3: Full-state notify each watcher we found */
994         rows = RES_ROWS(result);
995         for (i = 0; i < RES_ROW_N(result); i++)
996         {
997                 memset(&sub, 0, sizeof(subs_t));
998                 values = ROW_VALUES(&rows[i]);
999                 EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
1000                 EXTRACT_STRING(sub.to_user, VAL_STRING(&values[tuser_col]));
1001                 EXTRACT_STRING(sub.to_domain, VAL_STRING(&values[tdomain_col]));
1002                 EXTRACT_STRING(sub.from_user, VAL_STRING(&values[fuser_col]));
1003                 EXTRACT_STRING(sub.from_domain, VAL_STRING(&values[fdomain_col]));
1004                 EXTRACT_STRING(sub.watcher_user, VAL_STRING(&values[wuser_col]));
1005                 EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&values[wdomain_col]));
1006                 EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
1007                 EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
1008                 EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
1009                 EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&values[sockinfo_col]));
1010                 EXTRACT_STRING(sub.local_contact, VAL_STRING(&values[lcontact_col]));
1011                 EXTRACT_STRING(sub.contact, VAL_STRING(&values[contact_col]));
1012                 EXTRACT_STRING(sub.record_route, VAL_STRING(&values[rroute_col]));
1013                 EXTRACT_STRING(sub.event_id, VAL_STRING(&values[event_id_col]));
1014                 EXTRACT_STRING(sub.reason, VAL_STRING(&values[reason_col]));
1015                 EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
1016                 sub.event = pres_contains_event(&ev_sname, &parsed_event);
1017                 if (sub.event == NULL)
1018                 {
1019                         LM_ERR("event not found and set to NULL\n");
1020                         goto done;
1021                 }
1022
1023                 sub.local_cseq = VAL_INT(&values[lcseq_col]);
1024                 sub.remote_cseq = VAL_INT(&values[rcseq_col]);
1025                 sub.status = VAL_INT(&values[status_col]);
1026                 sub.version = VAL_INT(&values[version_col]);
1027                 if (VAL_INT(&values[expires_col]) > now + rls_expires_offset)
1028                 {
1029                         sub.expires = VAL_INT(&values[expires_col]) - now;
1030
1031                         if (rls_get_service_list(&sub.pres_uri, &sub.watcher_user,
1032                                 &sub.watcher_domain, &service_node, &doc) < 0)
1033                         {
1034                                 LM_ERR("failed getting resource list\n");
1035                                 goto done;
1036                         }
1037                         if (doc == NULL)
1038                         {
1039                                 LM_WARN("no document returned for uri <%.*s>\n",
1040                                         sub.pres_uri.len, sub.pres_uri.s);
1041                                 goto done;
1042                         }
1043
1044                         if (send_full_notify(&sub, service_node, &sub.pres_uri, 0) < 0)
1045                         {
1046                                 LM_ERR("failed sending full state notify\n");
1047                                 goto done;
1048                         }
1049                         xmlFreeDoc(doc);
1050                         doc = NULL;
1051                 }
1052                 else
1053                 {
1054                         sub.expires = 0;
1055                         rls_send_notify(&sub, NULL, NULL, NULL);
1056                         delete_rlsdb(&sub.callid, &sub.to_tag, &sub.from_tag);
1057                 }
1058         }
1059
1060 done:
1061         if (result != NULL)
1062                 rls_dbf.free_result(rls_db, result);
1063         if (doc != NULL)
1064                 xmlFreeDoc(doc);
1065         if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction)
1066         {
1067                 if (rls_dbf.abort_transaction(rls_db) < 0)
1068                         LM_ERR("in abort_transaction\n");
1069         }
1070 }
1071
1072 static void timer_send_update_notifies(int round)
1073 {
1074         db_key_t query_cols[1], update_cols[1], result_cols[6];
1075         db_val_t query_vals[1], update_vals[1];
1076         int did_col, resource_uri_col, auth_state_col, reason_col,
1077                 pres_state_col, content_type_col;
1078         int n_result_cols= 0;
1079         db1_res_t *result= NULL;
1080         db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
1081
1082         query_cols[0]= &str_updated_col;
1083         query_vals[0].type = DB1_INT;
1084         query_vals[0].nul = 0;
1085         query_vals[0].val.int_val= round;
1086
1087         result_cols[did_col= n_result_cols++]= &str_rlsubs_did_col;
1088         result_cols[resource_uri_col= n_result_cols++]= &str_resource_uri_col;
1089         result_cols[auth_state_col= n_result_cols++]= &str_auth_state_col;
1090         result_cols[content_type_col= n_result_cols++]= &str_content_type_col;
1091         result_cols[reason_col= n_result_cols++]= &str_reason_col;
1092         result_cols[pres_state_col= n_result_cols++]= &str_presence_state_col;
1093
1094         update_cols[0]= &str_updated_col;
1095         update_vals[0].type = DB1_INT;
1096         update_vals[0].nul = 0;
1097         update_vals[0].val.int_val= NO_UPDATE_TYPE; 
1098
1099         /* query in alphabetical order after rlsusbs_did 
1100          * (resource list Subscribe dialog indentifier)*/
1101         
1102         if (rlpres_dbf.use_table(rlpres_db, &rlpres_table) < 0) 
1103         {
1104                 LM_ERR("in use_table\n");
1105                 goto done;
1106         }
1107
1108         if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction)
1109         {
1110                 if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0)
1111                 {
1112                         LM_ERR("in start_transaction\n");
1113                         goto done;
1114                 }
1115         }
1116
1117         if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols,
1118                                         1, n_result_cols, &str_rlsubs_did_col, &result)< 0)
1119         {
1120                 LM_ERR("in sql query\n");
1121                 goto done;
1122         }
1123         if(result == NULL || result->n <= 0)
1124                 goto done;
1125
1126         if(rlpres_dbf.update(rlpres_db, query_cols, 0, query_vals, update_cols,
1127                                         update_vals, 1, 1)< 0)
1128         {
1129                 LM_ERR("in sql update\n");
1130                 goto done;
1131         }
1132
1133         if (dbmode == RLS_DB_ONLY && rlpres_dbf.end_transaction)
1134         {
1135                 if (rlpres_dbf.end_transaction(rlpres_db) < 0)
1136                 {
1137                         LM_ERR("in end_transaction\n");
1138                         goto done;
1139                 }
1140         }
1141
1142         send_notifies(result, did_col, resource_uri_col, auth_state_col, reason_col,
1143                   pres_state_col, content_type_col);
1144 done:
1145         if(result)
1146                 rlpres_dbf.free_result(rlpres_db, result);
1147
1148         if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction)
1149         {
1150                 if (rlpres_dbf.abort_transaction(rlpres_db) < 0)
1151                         LM_ERR("in abort_transaction\n");
1152         }
1153 }
1154
1155 void timer_send_notify(unsigned int ticks,void *param)
1156 {
1157         if (dbmode == RLS_DB_ONLY)
1158         {
1159                 int process_num = *((int *) param);
1160                 int round = subset + (waitn_time * rls_notifier_poll_rate * process_num);
1161                 if (++subset > (waitn_time * rls_notifier_poll_rate) - 1) subset = 0;
1162
1163                 timer_send_full_state_notifies(round);
1164                 timer_send_update_notifies(round);
1165         }
1166         else
1167                 timer_send_update_notifies(UPDATED_TYPE);
1168 }
1169
1170
1171 /* function to periodicaly clean the rls_presentity table */
1172
1173 void rls_presentity_clean(unsigned int ticks,void *param)
1174 {
1175         db_key_t query_cols[1];
1176         db_op_t query_ops[1];
1177         db_val_t query_vals[1];
1178
1179         query_cols[0]= &str_expires_col;
1180         query_ops[0]= OP_LT;
1181         query_vals[0].nul= 0;
1182         query_vals[0].type= DB1_INT;
1183         query_vals[0].val.int_val= (int)time(NULL) - rls_expires_offset;
1184
1185         if (rlpres_dbf.use_table(rlpres_db, &rlpres_table) < 0) 
1186         {
1187                 LM_ERR("in use_table\n");
1188                 return ;
1189         }
1190
1191         if(rlpres_dbf.delete(rlpres_db, query_cols, query_ops, query_vals, 1)< 0)
1192         {
1193                 LM_ERR("in sql delete\n");
1194                 return ;
1195         }
1196
1197 }