2 * pua module - presence user agent module
4 * Copyright (C) 2006 Voice Sistem S.R.L.
6 * This file is part of Kamailio, a free SIP server.
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28 #include <libxml/parser.h>
31 #include "../../core/mem/mem.h"
32 #include "../../core/dprint.h"
33 #include "../../core/parser/parse_expires.h"
34 #include "../../core/dprint.h"
35 #include "../../core/mem/shm_mem.h"
36 #include "../../core/parser/msg_parser.h"
37 #include "../../core/strutils.h"
38 #include "../../core/hashes.h"
39 #include "../../modules/tm/tm_load.h"
42 #include "send_publish.h"
43 #include "pua_callback.h"
44 #include "event_list.h"
47 extern db_locking_t db_table_lock;
49 str* publ_build_hdr(int expires, pua_event_t* ev, str* content_type, str* etag,
50 str* extra_headers, int is_body)
52 static char buf[3000];
54 char* expires_s = NULL;
59 str_hdr =(str*)pkg_malloc(sizeof(str));
62 LM_ERR("no more memory\n");
65 memset(str_hdr, 0 , sizeof(str));
70 memcpy(str_hdr->s ,"Max-Forwards: ", 14);
72 str_hdr->len+= sprintf(str_hdr->s+ str_hdr->len,"%d", MAX_FORWARD);
73 memcpy(str_hdr->s+str_hdr->len, CRLF, CRLF_LEN);
74 str_hdr->len += CRLF_LEN;
76 memcpy(str_hdr->s+ str_hdr->len ,"Event: ", 7);
78 memcpy(str_hdr->s+ str_hdr->len, ev->name.s, ev->name.len);
79 str_hdr->len+= ev->name.len;
80 memcpy(str_hdr->s+str_hdr->len, CRLF, CRLF_LEN);
81 str_hdr->len += CRLF_LEN;
84 memcpy(str_hdr->s+str_hdr->len ,"Expires: ", 9);
97 expires_s = int2str(t, &len);
99 memcpy(str_hdr->s+str_hdr->len, expires_s, len);
101 memcpy(str_hdr->s+str_hdr->len, CRLF, CRLF_LEN);
102 str_hdr->len += CRLF_LEN;
106 LM_DBG("UPDATE_TYPE [etag]= %.*s\n", etag->len, etag->s);
107 memcpy(str_hdr->s+str_hdr->len,"SIP-If-Match: ", 14);
109 memcpy(str_hdr->s+str_hdr->len, etag->s, etag->len);
110 str_hdr->len += etag->len;
111 memcpy(str_hdr->s+str_hdr->len, CRLF, CRLF_LEN);
112 str_hdr->len += CRLF_LEN;
116 if(content_type== NULL || content_type->s== NULL || content_type->len== 0)
118 ctype= ev->content_type; /* use event default value */
122 ctype.s= content_type->s;
123 ctype.len= content_type->len;
126 memcpy(str_hdr->s+str_hdr->len,"Content-Type: ", 14);
128 memcpy(str_hdr->s+str_hdr->len, ctype.s, ctype.len);
129 str_hdr->len += ctype.len;
130 memcpy(str_hdr->s+str_hdr->len, CRLF, CRLF_LEN);
131 str_hdr->len += CRLF_LEN;
134 if(extra_headers && extra_headers->s && extra_headers->len)
136 memcpy(str_hdr->s+str_hdr->len,extra_headers->s , extra_headers->len);
137 str_hdr->len += extra_headers->len;
139 str_hdr->s[str_hdr->len] = '\0';
145 static void find_and_delete_record(ua_pres_t *dialog, int hash_code)
147 ua_pres_t *presentity;
149 if (dbmode == PUA_DB_ONLY)
151 delete_record_puadb(dialog);
155 lock_get(&HashT->p_records[hash_code].lock);
156 presentity = search_htable(dialog, hash_code);
157 if (presentity == NULL)
159 LM_DBG("Record found in table and deleted\n");
160 lock_release(&HashT->p_records[hash_code].lock);
163 delete_htable(presentity, hash_code);
164 lock_release(&HashT->p_records[hash_code].lock);
168 static int find_and_update_record(ua_pres_t *dialog, int hash_code, int lexpire, str *etag)
170 ua_pres_t *presentity;
172 if(dbmode==PUA_DB_ONLY)
174 return update_record_puadb(dialog, lexpire, etag);
178 lock_get(&HashT->p_records[hash_code].lock);
179 presentity = search_htable(dialog, hash_code);
180 if (presentity == NULL)
182 LM_DBG("Record found in table and deleted\n");
183 lock_release(&HashT->p_records[hash_code].lock);
186 update_htable(presentity, dialog->desired_expires, lexpire, etag, hash_code, NULL);
187 lock_release(&HashT->p_records[hash_code].lock);
192 void publ_cback_func(struct cell *t, int type, struct tmcb_params *ps)
194 struct hdr_field* hdr= NULL;
195 struct sip_msg* msg= NULL;
196 ua_pres_t* presentity= NULL;
197 ua_pres_t* db_presentity= NULL;
198 ua_pres_t* hentity= NULL;
201 unsigned int lexpire= 0;
203 unsigned int hash_code;
206 str pres_uri={0,0}, watcher_uri={0,0}, extra_headers={0,0};
207 int end_transaction = 1;
209 memset(&dbpres, 0, sizeof(dbpres));
210 dbpres.pres_uri = &pres_uri;
211 dbpres.watcher_uri = &watcher_uri;
212 dbpres.extra_headers = &extra_headers;
214 if (dbmode == PUA_DB_ONLY && pua_dbf.start_transaction)
216 if (pua_dbf.start_transaction(pua_db, db_table_lock) < 0)
218 LM_ERR("in start_transaction\n");
223 if(ps->param== NULL|| *ps->param== NULL)
225 LM_ERR("NULL callback parameter\n");
228 hentity= (ua_pres_t*)(*ps->param);
233 LM_ERR("no reply message found\n ");
238 if(msg== FAKED_REPLY)
240 LM_DBG("FAKED_REPLY\n");
244 hash_code= core_hash(hentity->pres_uri, NULL, HASH_SIZE);
248 find_and_delete_record(hentity, hash_code);
250 if(ps->code== 412 && hentity->body && hentity->flag!= RPC_PUBLISH
251 && hentity->flag!= RPC_ASYN_PUBLISH)
253 /* sent a PUBLISH within a dialog that no longer exists
254 * send again an intial PUBLISH */
255 LM_DBG("received a 412 reply- try again to send PUBLISH\n");
257 memset(&publ, 0, sizeof(publ_info_t));
258 publ.pres_uri= hentity->pres_uri;
259 publ.body= hentity->body;
261 if(hentity->desired_expires== 0)
264 if(hentity->desired_expires<= (int)time(NULL))
267 publ.expires= hentity->desired_expires- (int)time(NULL)+ 3;
269 publ.source_flag|= hentity->flag;
270 publ.event|= hentity->event;
271 publ.content_type= hentity->content_type;
272 publ.id= hentity->id;
273 publ.extra_headers= hentity->extra_headers;
274 publ.outbound_proxy = hentity->outbound_proxy;
275 publ.cb_param= hentity->cb_param;
277 if (dbmode == PUA_DB_ONLY && pua_dbf.end_transaction)
279 if (pua_dbf.end_transaction(pua_db) < 0)
281 LM_ERR("in end_transaction\n");
288 if(send_publish(&publ)< 0)
290 LM_ERR("when trying to send PUBLISH\n");
297 if( parse_headers(msg,HDR_EOH_F, 0)==-1 )
299 LM_ERR("parsing headers\n");
302 if(msg->expires== NULL || msg->expires->body.len<= 0)
304 LM_ERR("No Expires header found\n");
308 if (!msg->expires->parsed && (parse_expires(msg->expires) < 0))
310 LM_ERR("cannot parse Expires header\n");
313 lexpire = ((exp_body_t*)msg->expires->parsed)->val;
314 LM_DBG("lexpire= %u\n", lexpire);
319 if(cmp_hdrname_strzn(&hdr->name, "SIP-ETag",8)==0 )
326 if(found== 0) /* must find SIP-Etag header field in 200 OK msg*/
328 LM_ERR("no SIP-ETag header field found\n");
333 LM_DBG("completed with status %d [contact:%.*s]\n",
334 ps->code, hentity->pres_uri->len, hentity->pres_uri->s);
338 find_and_delete_record(hentity, hash_code);
342 if (hentity->etag.s) {
343 if (pua_dbf.affected_rows != NULL || dbmode != PUA_DB_ONLY) {
344 if (find_and_update_record(hentity, hash_code,
348 else if ((db_presentity =
349 get_record_puadb(hentity->id, &hentity->etag,
350 &dbpres, &res)) != NULL)
352 update_record_puadb(hentity, lexpire, &etag);
357 size= sizeof(ua_pres_t)+ sizeof(str)+
358 (hentity->pres_uri->len+ hentity->tuple_id.len +
359 hentity->id.len)* sizeof(char);
360 if(hentity->extra_headers)
361 size+= sizeof(str)+ hentity->extra_headers->len* sizeof(char);
363 presentity= (ua_pres_t*)shm_malloc(size);
364 if(presentity== NULL)
366 LM_ERR("no more share memory\n");
369 memset(presentity, 0, size);
371 size= sizeof(ua_pres_t);
372 presentity->pres_uri= (str*)((char*)presentity+ size);
375 presentity->pres_uri->s= (char*)presentity+ size;
376 memcpy(presentity->pres_uri->s, hentity->pres_uri->s,
377 hentity->pres_uri->len);
378 presentity->pres_uri->len= hentity->pres_uri->len;
379 size+= hentity->pres_uri->len;
381 presentity->tuple_id.s= (char*)presentity+ size;
382 memcpy(presentity->tuple_id.s, hentity->tuple_id.s,
383 hentity->tuple_id.len);
384 presentity->tuple_id.len= hentity->tuple_id.len;
385 size+= presentity->tuple_id.len;
387 presentity->id.s=(char*)presentity+ size;
388 memcpy(presentity->id.s, hentity->id.s,
390 presentity->id.len= hentity->id.len;
391 size+= presentity->id.len;
393 if(hentity->extra_headers)
395 presentity->extra_headers= (str*)((char*)presentity+ size);
397 presentity->extra_headers->s= (char*)presentity+ size;
398 memcpy(presentity->extra_headers->s, hentity->extra_headers->s,
399 hentity->extra_headers->len);
400 presentity->extra_headers->len= hentity->extra_headers->len;
401 size+= hentity->extra_headers->len;
404 presentity->desired_expires= hentity->desired_expires;
405 presentity->expires= lexpire+ (int)time(NULL);
406 presentity->flag|= hentity->flag;
407 presentity->event|= hentity->event;
409 presentity->etag.s= (char*)shm_malloc(etag.len* sizeof(char));
410 if(presentity->etag.s== NULL)
412 LM_ERR("No more share memory\n");
415 memcpy(presentity->etag.s, etag.s, etag.len);
416 presentity->etag.len= etag.len;
418 if (dbmode==PUA_DB_ONLY)
420 insert_record_puadb(presentity);
424 lock_get(&HashT->p_records[hash_code].lock);
425 insert_htable(presentity, hash_code);
426 lock_release(&HashT->p_records[hash_code].lock);
428 LM_DBG("Inserted record\n");
431 if(hentity->ua_flag == REQ_OTHER)
433 run_pua_callbacks(hentity, msg);
437 shm_free(*ps->param);
440 if(dbmode==PUA_DB_ONLY && presentity)
442 shm_free(presentity->etag.s);
443 shm_free(presentity);
446 if (res) free_results_puadb(res);
448 if (dbmode == PUA_DB_ONLY && pua_dbf.end_transaction && end_transaction)
450 if (pua_dbf.end_transaction(pua_db) < 0)
452 LM_ERR("in end_transaction\n");
460 if(ps->param && *ps->param)
462 shm_free(*ps->param);
465 if(presentity) shm_free(presentity);
467 if (res) free_results_puadb(res);
469 if (dbmode == PUA_DB_ONLY && pua_dbf.abort_transaction)
471 if (pua_dbf.abort_transaction(pua_db) < 0)
472 LM_ERR("in abort_transaction\n");
478 int send_publish( publ_info_t* publ )
480 str met = {"PUBLISH", 7};
482 ua_pres_t* presentity= NULL;
485 ua_pres_t* cb_param= NULL;
486 unsigned int hash_code=0;
491 pua_event_t* ev= NULL;
495 str pres_uri={0,0}, watcher_uri={0,0}, extra_headers={0,0};
498 LM_DBG("pres_uri=%.*s\n", publ->pres_uri->len, publ->pres_uri->s );
500 if (dbmode == PUA_DB_ONLY && pua_dbf.start_transaction)
502 if (pua_dbf.start_transaction(pua_db, db_table_lock) < 0)
504 LM_ERR("in start_transaction\n");
509 /* get event from list */
510 ev= get_event(publ->event);
513 LM_ERR("event not found in list\n");
517 if (dbmode==PUA_DB_ONLY)
520 memset(&dbpres, 0, sizeof(dbpres));
521 dbpres.pres_uri = &pres_uri;
522 dbpres.watcher_uri = &watcher_uri;
523 dbpres.extra_headers = &extra_headers;
524 presentity = get_record_puadb(publ->id, publ->etag,
532 memset(&pres, 0, sizeof(ua_pres_t));
533 pres.pres_uri = publ->pres_uri;
534 pres.flag = publ->source_flag;
536 pres.event = publ->event;
538 pres.etag = *publ->etag;
540 hash_code= core_hash(publ->pres_uri, NULL, HASH_SIZE);
541 lock_get(&HashT->p_records[hash_code].lock);
542 presentity= search_htable(&pres, hash_code);
545 if(publ->etag && presentity== NULL)
547 if (dbmode!=PUA_DB_ONLY)
548 lock_release(&HashT->p_records[hash_code].lock);
553 if(publ->flag & INSERT_TYPE)
555 LM_DBG("Insert flag set\n");
559 if(presentity== NULL)
562 if (dbmode!=PUA_DB_ONLY)
563 lock_release(&HashT->p_records[hash_code].lock);
564 LM_DBG("insert type\n");
566 if(publ->flag & UPDATE_TYPE )
568 LM_DBG("UPDATE_TYPE and no record found \n");
569 publ->flag= INSERT_TYPE;
571 if(publ->expires== 0)
573 LM_DBG("request for a publish with expires 0 and"
574 " no record found\n");
578 if(publ->body== NULL)
580 LM_ERR("New PUBLISH and no body found- invalid request\n");
581 ret = ERR_PUBLISH_NO_BODY;
587 LM_DBG("record found\n");
588 publ->flag= UPDATE_TYPE;
589 etag.s= (char*)pkg_malloc(presentity->etag.len* sizeof(char));
592 LM_ERR("while allocating memory\n");
593 if (dbmode!=PUA_DB_ONLY)
594 lock_release(&HashT->p_records[hash_code].lock);
597 memcpy(etag.s, presentity->etag.s, presentity->etag.len);
598 etag.len= presentity->etag.len;
600 if(presentity->tuple_id.s && presentity->tuple_id.len)
603 tuple_id=(str*)pkg_malloc(sizeof(str));
606 LM_ERR("No more memory\n");
607 if (dbmode!=PUA_DB_ONLY)
608 lock_release(&HashT->p_records[hash_code].lock);
611 tuple_id->s= (char*)pkg_malloc(presentity->tuple_id.len* sizeof(char));
612 if(tuple_id->s== NULL)
614 LM_ERR("No more memory\n");
615 if (dbmode!=PUA_DB_ONLY)
616 lock_release(&HashT->p_records[hash_code].lock);
619 memcpy(tuple_id->s, presentity->tuple_id.s, presentity->tuple_id.len);
620 tuple_id->len= presentity->tuple_id.len;
623 if(publ->expires== 0)
625 LM_DBG("expires= 0- delete from hash table\n");
626 if (dbmode!=PUA_DB_ONLY)
627 lock_release(&HashT->p_records[hash_code].lock);
631 presentity->version++;
632 ver= presentity->version;
634 if (dbmode==PUA_DB_ONLY)
636 update_version_puadb(presentity);
640 lock_release(&HashT->p_records[hash_code].lock);
645 if(publ->body && publ->body->s)
647 ret_code= ev->process_body(publ, &body, ver, &tuple_id );
648 if( ret_code< 0 || body== NULL)
650 LM_ERR("while processing body\n");
652 LM_ERR("NULL body\n");
657 LM_DBG("tuple_id= %.*s\n", tuple_id->len, tuple_id->s );
661 /* construct the callback parameter */
662 if(etag.s && etag.len)
665 cb_param= publish_cbparam(publ, body, tuple_id, REQ_OTHER);
668 LM_ERR("constructing callback parameter\n");
672 if(publ->flag & UPDATE_TYPE)
673 LM_DBG("etag:%.*s\n", etag.len, etag.s);
674 str_hdr = publ_build_hdr((publ->expires< 0)?3600:publ->expires, ev, &publ->content_type,
675 (publ->flag & UPDATE_TYPE)?&etag:NULL, publ->extra_headers, (body)?1:0);
679 LM_ERR("while building extra_headers\n");
683 LM_DBG("publ->pres_uri:\n%.*s\n ", publ->pres_uri->len, publ->pres_uri->s);
684 LM_DBG("str_hdr:\n%.*s %d\n ", str_hdr->len, str_hdr->s, str_hdr->len);
685 if(body && body->len && body->s )
686 LM_DBG("body:\n%.*s\n ", body->len, body->s);
688 set_uac_req(&uac_r, &met, str_hdr, body, 0, TMCB_LOCAL_COMPLETED,
689 publ_cback_func, (void*)cb_param);
690 result= tmb.t_request(&uac_r,
691 publ->pres_uri, /*! Request-URI */
692 publ->pres_uri, /*! To */
693 publ->pres_uri, /*! From */
694 publ->outbound_proxy?
695 publ->outbound_proxy:&outbound_proxy /*! Outbound proxy*/
700 LM_ERR("in t_request tm module function\n");
707 if (dbmode == PUA_DB_ONLY && pua_dbf.end_transaction)
709 if (pua_dbf.end_transaction(pua_db) < 0)
711 LM_ERR("in end_transaction\n");
721 if (dbmode == PUA_DB_ONLY && pua_dbf.abort_transaction)
723 if (pua_dbf.abort_transaction(pua_db) < 0)
724 LM_ERR("in abort_transaction\n");
742 pkg_free(tuple_id->s);
745 free_results_puadb(res);
750 ua_pres_t* publish_cbparam(publ_info_t* publ,str* body,str* tuple_id,
754 ua_pres_t* cb_param= NULL;
756 size= sizeof(ua_pres_t)+ sizeof(str)+ (publ->pres_uri->len+
757 + publ->content_type.len+ publ->id.len+ 1)*sizeof(char);
759 if(publ->outbound_proxy)
760 size+= sizeof(str)+ publ->outbound_proxy->len* sizeof(char);
761 if(body && body->s && body->len)
762 size+= sizeof(str)+ body->len* sizeof(char);
764 size+= publ->etag->len* sizeof(char);
765 if(publ->extra_headers)
766 size+= sizeof(str)+ publ->extra_headers->len* sizeof(char);
768 size+= tuple_id->len* sizeof(char);
770 cb_param= (ua_pres_t*)shm_malloc(size);
773 LM_ERR("ERROR no more share memory while allocating cb_param"
774 " - size= %d\n", size);
777 memset(cb_param, 0, size);
779 size = sizeof(ua_pres_t);
781 cb_param->pres_uri = (str*)((char*)cb_param + size);
783 cb_param->pres_uri->s = (char*)cb_param + size;
784 memcpy(cb_param->pres_uri->s, publ->pres_uri->s ,
785 publ->pres_uri->len ) ;
786 cb_param->pres_uri->len= publ->pres_uri->len;
787 size+= publ->pres_uri->len;
789 if(publ->id.s && publ->id.len)
791 cb_param->id.s = ((char*)cb_param+ size);
792 memcpy(cb_param->id.s, publ->id.s, publ->id.len);
793 cb_param->id.len= publ->id.len;
797 if(body && body->s && body->len)
799 cb_param->body = (str*)((char*)cb_param + size);
802 cb_param->body->s = (char*)cb_param + size;
803 memcpy(cb_param->body->s, body->s ,
805 cb_param->body->len= body->len;
810 cb_param->etag.s = (char*)cb_param + size;
811 memcpy(cb_param->etag.s, publ->etag->s ,
813 cb_param->etag.len= publ->etag->len;
814 size+= publ->etag->len;
816 if(publ->extra_headers)
818 cb_param->extra_headers = (str*)((char*)cb_param + size);
820 cb_param->extra_headers->s = (char*)cb_param + size;
821 memcpy(cb_param->extra_headers->s, publ->extra_headers->s ,
822 publ->extra_headers->len ) ;
823 cb_param->extra_headers->len= publ->extra_headers->len;
824 size+= publ->extra_headers->len;
826 if(publ->outbound_proxy)
828 cb_param->outbound_proxy = (str*)((char*)cb_param + size);
830 cb_param->outbound_proxy->s = (char*)cb_param + size;
831 memcpy(cb_param->outbound_proxy->s, publ->outbound_proxy->s,
832 publ->outbound_proxy->len);
833 cb_param->outbound_proxy->len = publ->outbound_proxy->len;
834 size+= publ->outbound_proxy->len;
837 if(publ->content_type.s && publ->content_type.len)
839 cb_param->content_type.s= (char*)cb_param + size;
840 memcpy(cb_param->content_type.s, publ->content_type.s, publ->content_type.len);
841 cb_param->content_type.len= publ->content_type.len;
842 size+= publ->content_type.len;
846 cb_param->tuple_id.s = (char*)cb_param+ size;
847 memcpy(cb_param->tuple_id.s, tuple_id->s ,tuple_id->len);
848 cb_param->tuple_id.len= tuple_id->len;
849 size+= tuple_id->len;
852 cb_param->event= publ->event;
853 cb_param->flag|= publ->source_flag;
854 cb_param->cb_param= publ->cb_param;
855 cb_param->ua_flag= ua_flag;
858 cb_param->desired_expires= 0;
860 cb_param->desired_expires=publ->expires+ (int)time(NULL);