all: updated FSF address in GPL text
[sip-router] / modules / db_cassandra / dbcassa_table.c
1 /*
2  * $Id$
3  *
4  * Copyright (C) 2012 1&1 Internet Development
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License 
19  * along with this program; if not, write to the Free Software 
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  * 
22  * History:
23  * --------
24  * 2012-01 initial version (Anca Vamanu)
25  *
26  */
27
28 #include <stdio.h>
29 #include <string.h>
30 #include <time.h>
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <dirent.h>
34
35 #include "../../mem/shm_mem.h"
36 #include "../../mem/mem.h"
37 #include "../../dprint.h"
38 #include "../../hashes.h"
39 #include "../../lock_ops.h"
40
41 #include "dbcassa_table.h"
42
43 #define DBCASSA_TABLE_SIZE 16
44
45 typedef struct  rw_lock {
46         gen_lock_t lock;
47         int reload_flag;
48         int data_refcnt;
49 } rw_lock_t;
50
51 typedef struct _dbcassa_tbl_htable
52 {
53         rw_lock_t lock;
54         dbcassa_table_p dtp;
55 } dbcassa_tbl_htable_t, *dbcassa_tbl_htable_p;
56
57 static dbcassa_tbl_htable_p dbcassa_tbl_htable = NULL;
58 extern str dbcassa_schema_path;
59 static char full_path_buf[_POSIX_PATH_MAX + 1];
60
61 /**
62  * Check if file modified from last read
63  * -1 - error
64  *  0 - no change
65  *  1 - changed
66  */
67 int dbcassa_check_mtime(time_t *mt)
68 {
69         struct stat s;
70
71         if(stat(full_path_buf, &s) == 0)
72         {
73                 if((int)s.st_mtime > (int)*mt)
74                 {
75                         *mt = s.st_mtime;
76                         LM_DBG("[%s] was updated\n", full_path_buf);
77                         return 1;
78                 }
79         } else {
80                 LM_DBG("stat failed on [%s]\n", full_path_buf);
81                 return -1;
82         }
83         return 0;
84 }
85
86 /*
87  *      Create new table structure
88  *
89  * */
90
91 dbcassa_table_p dbcassa_table_new(const str *_tbname, const str *_dbname)
92 {
93         struct stat s;
94         dbcassa_table_p dtp = NULL;
95         int size;
96
97         if(!_tbname || !_dbname) {
98                 LM_ERR("Invalid parameters\n");
99                 return 0;
100         }
101
102         size = sizeof(dbcassa_table_t)+_tbname->len+_dbname->len;
103         dtp = (dbcassa_table_p)shm_malloc(size);
104         if(!dtp) {
105                 LM_ERR("No more shared memory\n");
106                 return 0;
107         }
108
109         memset(dtp, 0, size);
110         size = sizeof(dbcassa_table_t);
111         dtp->name.s = (char*)dtp + size;
112         memcpy(dtp->name.s, _tbname->s, _tbname->len);
113         dtp->name.len = _tbname->len;
114         size+= _tbname->len;
115
116         dtp->dbname.s = (char*)dtp + size;
117         memcpy(dtp->dbname.s, _dbname->s, _dbname->len);
118         dtp->dbname.len = _dbname->len;
119
120         if(stat(full_path_buf, &s) == 0) {
121                 dtp->mt = s.st_mtime;
122                 LM_DBG("mtime is %d\n", (int)s.st_mtime);
123         }
124
125         return dtp;
126 }
127
128 dbcassa_column_p dbcassa_column_new(char *_s, int _l)
129 {
130         dbcassa_column_p dcp;
131         int size;
132
133         size = sizeof(dbcassa_column_t) + _l+ 1;
134         dcp = (dbcassa_column_p)shm_malloc(size);
135         if(!dcp) {
136                 LM_ERR("No more shared memory\n");
137                 return 0;
138         }
139         memset(dcp, 0, size);
140         dcp->name.s = (char*)dcp + sizeof(dbcassa_column_t);
141         memcpy(dcp->name.s, _s, _l);
142         dcp->name.len = _l;
143         dcp->name.s[_l] = '\0';
144
145         return dcp;
146 }
147
148 int dbcassa_column_free(dbcassa_column_p dcp)
149 {
150         if(!dcp)
151                 return -1;
152         shm_free(dcp);
153         return 0;
154 }
155
156 int dbcassa_table_free(dbcassa_table_p _dtp)
157 {
158         dbcassa_column_p _cp, _cp0;
159         
160         if(!_dtp)
161                 return -1;
162
163         /* cols*/
164         _cp = _dtp->cols;
165         while(_cp) {
166                 _cp0=_cp;
167                 _cp=_cp->next;
168                 dbcassa_column_free(_cp0);
169         }
170         /* key */
171         if(_dtp->key)
172                 shm_free(_dtp->key);
173         if(_dtp->sec_key)
174                 shm_free(_dtp->sec_key);
175
176         shm_free(_dtp);
177
178         return 0;
179 }
180
181 /**
182  * Load the table schema from file
183  */
184 dbcassa_table_p dbcassa_load_file(str* dbn, str* tbn)
185 {
186 #define KEY_MAX_LEN 10
187         FILE *fin=NULL;
188         char buf[4096];
189         int c, crow, ccol, bp, max_auto;
190         dbcassa_table_p dtp = 0;
191         dbcassa_column_p colp= 0;
192         dbcassa_column_p key[KEY_MAX_LEN];
193         dbcassa_column_p sec_key[KEY_MAX_LEN];
194
195         enum {DBCASSA_FLINE_ST, DBCASSA_NLINE_ST, DBCASSA_NLINE2_ST} state;
196
197         memset(key, 0, KEY_MAX_LEN*sizeof(dbcassa_column_p));
198         memset(sec_key, 0, KEY_MAX_LEN*sizeof(dbcassa_column_p));
199
200         LM_DBG("loading file [%s]\n", full_path_buf);
201         fin = fopen(full_path_buf, "rt");
202         if(!fin) {
203                 LM_ERR("Failed to open file\n");
204                 return 0;
205         }
206
207         dtp = dbcassa_table_new(tbn, dbn);
208         if(!dtp)
209                 goto done;
210
211         state = DBCASSA_FLINE_ST;
212         crow = ccol = -1;
213         c = fgetc(fin);
214         max_auto = 0;
215         while(c!=EOF) {
216                 switch(state) {
217                         case DBCASSA_FLINE_ST:
218                                 bp = 0;
219                                 while(c==DBCASSA_DELIM_C)
220                                         c = fgetc(fin);
221                                 if(c==DBCASSA_DELIM_R && !dtp->cols)
222                                         goto clean;
223                                 if(c==DBCASSA_DELIM_R) {
224                                         if(dtp->nrcols <= 0)
225                                                 goto clean;
226                                         
227                                         state = DBCASSA_NLINE_ST;
228                                         c = fgetc(fin);
229                                         break;
230                                 }
231                                 while(c!=DBCASSA_DELIM_C && c!='(' && c!=DBCASSA_DELIM_R) {
232                                         if(c==EOF)
233                                                 goto clean;
234                                         buf[bp++] = c;
235                                         c = fgetc(fin);
236                                 }
237                                 colp = dbcassa_column_new(buf, bp);
238                                 if(!colp)
239                                         goto clean;
240                                 LM_DBG("new col [%.*s]\n", bp, buf);
241                                 while(c==DBCASSA_DELIM_C)
242                                         c = fgetc(fin);
243                                 if(c!='(')
244                                         goto clean;
245                                 c = fgetc(fin);
246                                 while(c==DBCASSA_DELIM_C)
247                                         c = fgetc(fin);
248
249                                 switch(c) {
250                                         case 's':
251                                         case 'S':
252                                                 colp->type = DB1_STR;
253                                                 LM_DBG("column[%d] is STR!\n", ccol+1);
254                                         break;
255                                         case 'i':
256                                         case 'I':
257                                                 colp->type = DB1_INT;
258                                                 LM_DBG("column[%d] is INT!\n", ccol+1);
259                                         break;
260                                         case 'd':
261                                         case 'D':
262                                                 colp->type = DB1_DOUBLE;
263                                                 LM_DBG("column[%d] is DOUBLE!\n", ccol+1);
264                                         break;
265                                         case 't':
266                                         case 'T':
267                                                 colp->type = DB1_DATETIME;
268                                                 LM_DBG("column[%d] is TIME! Timestamp col has name [%s]\n", ccol+1, colp->name.s);
269                                                 if(dtp->ts_col) {
270                                                         LM_ERR("You can have only one column with type timestamp\n");
271                                                         goto clean;
272                                                 }
273                                                 dtp->ts_col = colp;
274                                         break;
275                                         default:
276                                                 LM_DBG("wrong column type!\n");
277                                                 goto clean;
278                                 }
279
280                                 while(c!='\n' && c!=EOF && c!=')' && c!= ',') {
281                                         if(colp->type == DB1_STR && (c=='i'|| c=='I')) {
282                                                 colp->type = DB1_STRING;
283                                                 LM_DBG("column[%d] is actually STRING!\n", ccol+1);
284                                         }
285                                         c = fgetc(fin);
286                                 }
287
288                                 if(c == ')') {
289                                         //LM_DBG("c=%c!\n", c);
290                                         colp->next = dtp->cols;
291                                         dtp->cols = colp;
292                                         dtp->nrcols++;
293                                         c = fgetc(fin);
294                                 }
295                                 else
296                                         goto clean;
297                                 ccol++;
298                         break;
299
300                         case DBCASSA_NLINE_ST:
301                         case DBCASSA_NLINE2_ST:
302                                 // unique key
303                                 while(c==DBCASSA_DELIM_C)
304                                         c = fgetc(fin);
305                                 if(c == DBCASSA_DELIM_R) {
306                                         state = DBCASSA_NLINE2_ST;
307                                         c = fgetc(fin);
308                                         break;
309                                 }
310
311                                 if(c == EOF)
312                                         break;
313                                 bp= 0;
314                                 while(c!=DBCASSA_DELIM_C && c!=DBCASSA_DELIM_R)
315                                 {
316                                         if(c==EOF)
317                                                 break;
318                                         buf[bp++] = c;
319                                         c = fgetc(fin);
320                                 }
321                                 colp = dtp->cols;
322                                 while(colp) {
323                                         if(bp==colp->name.len && strncmp(colp->name.s, buf, bp)==0) {
324                                                 if(state == DBCASSA_NLINE_ST)
325                                                         key[dtp->key_len++] = colp;
326                                                 else
327                                                         sec_key[dtp->seckey_len++] = colp;
328                                                 break;
329                                         }
330                                         colp = colp->next;
331                                 }
332                                 if(!colp) {
333                                         LM_ERR("Undefined column in key [%.*s]\n", bp, buf);
334                                         goto clean;
335                                 }
336                                 break;
337                 }
338         }
339
340         /* copy the keys into the table */
341         if(dtp->key_len) {
342                 dtp->key = (dbcassa_column_p*)
343                                 shm_malloc(dtp->key_len*sizeof(dbcassa_column_p));
344                 if(!dtp->key) {
345                         LM_ERR("No more share memory\n");
346                         goto clean;
347                 }
348                 for(ccol = 0; ccol< dtp->key_len; ccol++) {
349                         dtp->key[ccol] = key[ccol];
350                         LM_DBG("col [%.*s] in primary key\n", key[ccol]->name.len, key[ccol]->name.s);
351                 }
352         }
353         if(dtp->seckey_len) {
354                 dtp->sec_key = (dbcassa_column_p*)
355                                 shm_malloc(dtp->seckey_len*sizeof(dbcassa_column_p));
356                 if(!dtp->sec_key) {
357                         LM_ERR("No more share memory\n");
358                         goto clean;
359                 }
360                 for(ccol = 0; ccol< dtp->seckey_len; ccol++) {
361                         dtp->sec_key[ccol] = sec_key[ccol];
362                         LM_DBG("col [%.*s] in secondary key\n", sec_key[ccol]->name.len, sec_key[ccol]->name.s);
363                 }
364         }
365
366 done:
367         if(fin)
368                 fclose(fin);
369         return dtp;
370 clean:
371         if(fin)
372                 fclose(fin);
373         if(dtp)
374                 dbcassa_table_free(dtp);
375         return NULL;
376 }
377
378
379 #define ref_read_data(rw_lock) \
380 do {\
381         again:\
382         lock_get( &rw_lock.lock ); \
383         if (rw_lock.reload_flag) { \
384                 lock_release( &rw_lock.lock ); \
385                 usleep(5); \
386                 goto again; \
387         } \
388         rw_lock.data_refcnt++; \
389         lock_release( &rw_lock.lock ); \
390 } while(0)
391
392
393 #define unref_read_data(rw_lock) \
394 do {\
395         lock_get( &rw_lock.lock ); \
396         rw_lock.data_refcnt--; \
397         lock_release( &rw_lock.lock ); \
398 } while(0)
399
400
401 #define ref_write_data(rw_lock)\
402 do {\
403         lock_get( &rw_lock.lock ); \
404         rw_lock.reload_flag = 1; \
405         lock_release( &rw_lock.lock ); \
406         while (rw_lock.data_refcnt) \
407                 usleep(10); \
408 } while(0)
409
410
411 #define unref_write_data(rw_lock)\
412         rw_lock.reload_flag = 0;
413
414 /*
415  *      Search the table schema
416  * */
417 dbcassa_table_p dbcassa_db_search_table(int hashidx, int hash,
418                 const str* dbn, const str *tbn)
419 {
420         dbcassa_table_p tbc = NULL;
421         ref_read_data(dbcassa_tbl_htable[hashidx].lock);
422
423         tbc = dbcassa_tbl_htable[hashidx].dtp;
424         while(tbc) {
425                 LM_DBG("found dbname=%.*s, table=%.*s\n", tbc->dbname.len, tbc->dbname.s, tbc->name.len, tbc->name.s);
426                 if(tbc->hash==hash && tbc->dbname.len == dbn->len
427                         && tbc->name.len == tbn->len
428                         && !strncasecmp(tbc->dbname.s, dbn->s, dbn->len)
429                         && !strncasecmp(tbc->name.s, tbn->s, tbn->len))
430                         return tbc;
431                 tbc = tbc->next;
432         }
433         unref_read_data(dbcassa_tbl_htable[hashidx].lock);
434         return NULL;
435 }
436
437
438 /**
439  * Get the table schema. If the file was updated, update the table schema.
440  */
441 dbcassa_table_p dbcassa_db_get_table(const str* dbn, const str *tbn)
442 {
443         dbcassa_table_p tbc = NULL, old_tbc= NULL, new_tbc= NULL, prev_tbc= NULL;
444         int hash;
445         int hashidx;
446         int len;
447
448         if(!dbn || !tbn ) {
449                 LM_ERR("invalid parameter");
450                 return NULL;
451         }
452
453         hash = core_hash(dbn, tbn, DBCASSA_TABLE_SIZE);
454         hashidx = hash % DBCASSA_TABLE_SIZE;
455
456         ref_read_data(dbcassa_tbl_htable[hashidx].lock);
457
458         tbc = dbcassa_tbl_htable[hashidx].dtp;
459
460         while(tbc) {
461                 LM_DBG("found dbname=%.*s, table=%.*s\n", tbc->dbname.len, tbc->dbname.s, tbc->name.len, tbc->name.s);
462                 if(tbc->hash==hash && tbc->dbname.len == dbn->len
463                                 && tbc->name.len == tbn->len
464                                 && !strncasecmp(tbc->dbname.s, dbn->s, dbn->len)
465                                 && !strncasecmp(tbc->name.s, tbn->s, tbn->len)) {
466
467                         memcpy(full_path_buf + dbcassa_schema_path.len, dbn->s, dbn->len);
468                         len = dbcassa_schema_path.len + dbn->len;
469                         full_path_buf[len++] = '/';
470                         memcpy(full_path_buf + len, tbn->s, tbn->len);
471                         full_path_buf[len + tbn->len] = '\0';
472
473                         if(dbcassa_check_mtime(&tbc->mt) == 0)
474                                 return tbc;
475                         old_tbc = tbc;
476                         break;
477                 }
478                 tbc = tbc->next;
479         }
480         unref_read_data(dbcassa_tbl_htable[hashidx].lock);
481         if(!old_tbc)
482                 return NULL;
483
484         /* the file has changed - load again the schema */
485         new_tbc = dbcassa_load_file((str*)dbn, (str*)tbn);
486         if(!new_tbc)
487         {
488                 LM_ERR("could not load database from file [%.*s]\n", tbn->len, tbn->s);
489                 return NULL;
490         }
491         new_tbc->hash = hashidx;
492
493         /* lock for write */
494         ref_write_data(dbcassa_tbl_htable[hashidx].lock);
495         tbc = dbcassa_tbl_htable[hashidx].dtp;
496
497         while(tbc) {
498                 if(tbc == old_tbc)
499                         break;
500                 prev_tbc = tbc;
501                 tbc = tbc->next;
502         }
503
504         /* somebody else might have rewritten it in the mean time? just return the existing one */
505         if(!tbc) {
506                 unref_write_data(dbcassa_tbl_htable[hashidx].lock);
507                 return dbcassa_db_search_table(hashidx, hash, dbn, tbn);
508         }
509
510         /* replace the table */
511         new_tbc->next = old_tbc->next;
512         if(prev_tbc)
513                 prev_tbc->next = new_tbc;
514         else
515                 dbcassa_tbl_htable[hashidx].dtp = new_tbc;
516         dbcassa_table_free(old_tbc);
517         unref_write_data(dbcassa_tbl_htable[hashidx].lock);
518
519         /* lock for read, search the table and return */
520         return dbcassa_db_search_table(hashidx, hash, dbn, tbn);
521 }
522
523 /*
524  *      Read all table schemas at startup
525  * */
526 int dbcassa_read_table_schemas(void)
527 {
528         int i, j;
529         str db_name, tb_name;
530         DIR* srcdir = opendir(dbcassa_schema_path.s);
531         DIR* db_dir;
532         struct dirent* dent;
533         int fn_len = dbcassa_schema_path.len;
534         struct stat fstat;
535         int dir_len;
536         dbcassa_table_p tbc;
537         unsigned int hashidx;
538
539         /* init tables' hash table */
540         if (!dbcassa_tbl_htable) {
541                 dbcassa_tbl_htable = (dbcassa_tbl_htable_p)shm_malloc(DBCASSA_TABLE_SIZE*
542                                         sizeof(dbcassa_tbl_htable_t));
543                 if(dbcassa_tbl_htable==NULL)
544                 {
545                         LM_CRIT("no enough shm mem\n");
546                         return -1;
547                 }
548                 memset(dbcassa_tbl_htable, 0, DBCASSA_TABLE_SIZE*sizeof(dbcassa_tbl_htable_t));
549                 for(i=0; i<DBCASSA_TABLE_SIZE; i++)
550                 {
551                         if (lock_init(&dbcassa_tbl_htable[i].lock.lock)==0)
552                         {
553                                 LM_CRIT("cannot init tables' sem's\n");
554                                 for(j=i-1; j>=0; j--)
555                                         lock_destroy(&dbcassa_tbl_htable[j].rw_lock.lock);
556                                 return -1;
557                         }
558                 }
559         }
560
561         memset(full_path_buf, 0, _POSIX_PATH_MAX);
562         strcpy(full_path_buf, dbcassa_schema_path.s);
563         if (full_path_buf[dbcassa_schema_path.len - 1] != '/') {
564                 full_path_buf[fn_len++]= '/';
565                 dbcassa_schema_path.len++;
566         }
567
568         if (srcdir == NULL) {
569                 perror("opendir");
570                 return -1;
571         }
572         LM_DBG("Full name= %.*s\n", fn_len, full_path_buf);
573
574         while((dent = readdir(srcdir)) != NULL)
575         {
576                 if(strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0)
577                         continue;
578
579                 /* Calculate full name, check we are in file length limits */
580                 if ((fn_len + strlen(dent->d_name) + 1) > _POSIX_PATH_MAX)
581                         continue;
582
583                 db_name.s = dent->d_name;
584                 db_name.len = strlen(dent->d_name);
585                 
586                 strcpy(full_path_buf+fn_len, dent->d_name);
587                 dir_len = fn_len + db_name.len;
588
589                 LM_DBG("Full dir name= %.*s\n", dir_len, full_path_buf);
590
591                 if (stat(full_path_buf, &fstat) < 0) {
592                         LM_ERR("stat failed %s\n", strerror(errno));
593                         continue;
594                 }
595
596                 if (!S_ISDIR(fstat.st_mode))  {
597                         LM_ERR("not a directory\n");
598                         continue;
599                 }
600
601                 /*
602                 if (fstatat(dirfd(srcdir), dent->d_name, &st) < 0)
603                 {
604                         perror(dent->d_name);
605                         continue;
606                 }
607                 */
608
609                 LM_DBG("Found database %s\n", dent->d_name);
610                 db_dir = opendir(full_path_buf);
611                 if(!db_dir) {
612                         LM_ERR("Failed to open dictory %s\n", full_path_buf);
613                         continue;
614                 }
615                 full_path_buf[dir_len++]= '/';
616                 while((dent = readdir(db_dir)) != NULL)
617                 {
618                         if(strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0)
619                                 continue;
620                         LM_DBG("database table %s\n", dent->d_name);
621                         if(dir_len + strlen(dent->d_name)+1 > _POSIX_PATH_MAX) {
622                                 LM_ERR("File len too large\n");
623                                 continue;
624                         }
625                         strcpy(full_path_buf+dir_len, dent->d_name);
626
627                         tb_name.s = dent->d_name;
628                         tb_name.len = strlen(dent->d_name);
629
630                         LM_DBG("File path= %s\n", full_path_buf);
631                         tbc = dbcassa_load_file(&db_name, &tb_name);
632                         if(!tbc)
633                         {
634                                 LM_ERR("could not load database from file [%s]\n", tb_name.s);
635                                 return -1;
636                         }
637                         hashidx = core_hash(&db_name, &tb_name, DBCASSA_TABLE_SIZE);
638                         tbc->hash = hashidx;
639                         tbc->next = dbcassa_tbl_htable[hashidx].dtp;
640                         dbcassa_tbl_htable[hashidx].dtp = tbc;
641                 }
642                 closedir(db_dir);
643         }
644         closedir(srcdir);
645
646
647         return 0;
648 }
649
650 /*
651  *      Destroy table schema table at shutdown
652  * */
653 void dbcassa_destroy_htable(void)
654 {
655         int i;
656         dbcassa_table_p tbc, tbc0;
657
658         /* destroy tables' hash table*/
659         if(dbcassa_tbl_htable==0)
660                 return;
661
662         for(i=0; i<DBCASSA_TABLE_SIZE; i++) {
663                 lock_destroy(&dbcassa_tbl_htable[i].rw_lock.lock);
664                 tbc = dbcassa_tbl_htable[i].dtp;
665                 while(tbc) {
666                         tbc0 = tbc;
667                         tbc = tbc->next;
668                         dbcassa_table_free(tbc0);
669                 }
670         }
671         shm_free(dbcassa_tbl_htable);
672 }
673
674 void dbcassa_lock_release(dbcassa_table_p tbc)
675 {
676         unref_read_data(dbcassa_tbl_htable[tbc->hash].lock);
677 }