db_cassandra: another doxygen fix
[sip-router] / src / modules / db_cassandra / dbcassa_base.cpp
1 /*
2  * $Id$
3  *
4  * CASSANDRA module interface
5  *
6  * Copyright (C) 2012 1&1 Internet AG
7  *
8  * This file is part of Kamailio, a free SIP server.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License 
21  * along with this program; if not, write to the Free Software 
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23  *
24  * History:
25  * --------
26  * 2012-01  first version (Anca Vamanu)
27  * 2012-09  Added support for CQL queries (Boudewyn Ligthart)
28  */
29
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <sys/time.h>
34 #include <poll.h>
35 #include <iostream>
36 #include <boost/lexical_cast.hpp>
37 #include <protocol/TBinaryProtocol.h>
38 #include <transport/TSocket.h>
39 #include <transport/TTransportUtils.h>
40
41 extern "C" {
42 #include "../../core/timer.h"
43 #include "../../core/mem/mem.h"
44 #include "dbcassa_table.h"
45 }
46
47 #include "Cassandra.h"
48 #include "dbcassa_base.h"
49
50 namespace at  = apache::thrift;
51 namespace att = apache::thrift::transport;
52 namespace atp = apache::thrift::protocol;
53 namespace oac = org::apache::cassandra;
54
55 static const char cassa_key_delim  = ' ';
56 static const int  cassa_max_key_len= 512;
57
58 #define MAX_ROWS_NO        128     /* TODO: make this configurable or dynamic */
59 int row_slices[MAX_ROWS_NO][2];
60
61 /*
62  * ----         Cassandra Connection Section               ----
63  *  */
64
65 struct cassa_con {
66         struct db_id* id;           /*!< Connection identifier       */
67         unsigned int ref;           /*!< Reference count             */
68         struct pool_con* next;      /*!< Next connection in the pool */
69
70         str db_name;                /*!< Database name as str        */
71         oac::CassandraClient* con;  /*!< Cassandra connection        */
72 };
73
74 #define CON_CASSA(db_con)    ((struct cassa_con*)db_con->tail)
75
76 /*!
77  * \brief Open connection to Cassandra cluster
78  * \param id database id
79  * \return zero on succes
80  */
81 oac::CassandraClient* dbcassa_open(struct db_id* id)
82 {
83         try {
84                 boost::shared_ptr<att::TSocket> socket(new att::TSocket(id->host, id->port));
85                 boost::shared_ptr<att::TTransport> transport(new att::TFramedTransport (socket));
86                 boost::shared_ptr<atp::TProtocol> protocol(new atp::TBinaryProtocol(transport));
87
88                 socket->setConnTimeout(cassa_conn_timeout);
89                 socket->setSendTimeout(cassa_send_timeout);
90                 socket->setRecvTimeout(cassa_recv_timeout);
91
92                 std::auto_ptr<oac::CassandraClient> cassa_client(new oac::CassandraClient(protocol));
93
94                 transport->open();
95                 if (!transport->isOpen()) {
96                         LM_ERR("Failed to open transport to Cassandra\n");
97                         return 0;
98                 }
99
100                 /* database name ->  keyspace */
101
102                 cassa_client->set_keyspace(id->database);
103                 if(id->username && id->password) {
104                         oac::AuthenticationRequest au_req;
105                         std::map<std::string, std::string>  cred;
106                         cred.insert(std::pair<std::string, std::string>("username", id->username));
107                         cred.insert(std::pair<std::string, std::string>("password", id->password));
108                         au_req.credentials = cred;
109                         try {
110                                 cassa_client->login(au_req);
111                         } catch (const oac::AuthenticationException& autx) {
112                                 LM_ERR("Authentication failure: Credentials not valid, %s\n", autx.why.c_str());
113                         } catch (const oac::AuthorizationException & auzx) {
114                                 LM_ERR("Authentication failure: Credentials not valid for the selected database, %s\n", auzx.why.c_str());
115                         }
116                 }
117
118                 LM_DBG("Opened connection to Cassandra cluster  %s:%d\n", id->host, id->port);
119                 return cassa_client.release();
120
121         } catch (const oac::InvalidRequestException &irx) {
122                 LM_ERR("Database does not exist %s, %s\n", id->database, irx.why.c_str());
123         } catch (const at::TException &tx) {
124                 LM_ERR("Failed to open connection to Cassandra cluster %s:%d, %s\n",
125                                 id->database, id->port, tx.what());
126         } catch (const std::exception &ex) {
127                 LM_ERR("Failed: %s\n", ex.what());
128         } catch (...) {
129                 LM_ERR("Failed to open connection to Cassandra cluster\n");
130         }
131
132         return 0;
133 }
134
135 /*!
136  * \brief Create new DB connection structure
137  * \param id database id
138  */
139 void* db_cassa_new_connection(struct db_id* id)
140 {
141         struct cassa_con* ptr;
142
143         if (!id) {
144                 LM_ERR("invalid db_id parameter value\n");
145                 return 0;
146         }
147
148         if (id->port) {
149                 LM_DBG("opening connection: cassa://xxxx:xxxx@%s:%d/%s\n", ZSW(id->host),
150                         id->port, ZSW(id->database));
151         } else {
152                 LM_DBG("opening connection: cassa://xxxx:xxxx@%s/%s\n", ZSW(id->host),
153                         ZSW(id->database));
154         }
155
156         ptr = (struct cassa_con*)pkg_malloc(sizeof(struct cassa_con));
157         if (!ptr) {
158                 LM_ERR("failed trying to allocated %lu bytes for connection structure."
159                                 "\n", (unsigned long)sizeof(struct cassa_con));
160                 return 0;
161         }
162         LM_DBG("%p=pkg_malloc(%lu)\n", ptr, (unsigned long)sizeof(struct cassa_con));
163
164         memset(ptr, 0, sizeof(struct cassa_con));
165
166         ptr->db_name.s = id->database;
167         ptr->db_name.len = strlen(id->database);
168         ptr->id = id;
169         ptr->ref = 1;
170
171         ptr->con = dbcassa_open(id);
172         if(!ptr->con) {
173                 LM_ERR("Failed to open connection to Cassandra cluster\n");
174                 pkg_free(ptr);
175                 return 0;
176         }
177         return ptr;
178 }
179
180
181 /*!
182  * \brief Close Cassandra connection
183  * \param con Cassandra connection
184  */
185 void dbcassa_close(oac::CassandraClient* con)
186 {
187         if(! con) return;
188
189         delete con;
190 }
191
192 /*!
193  * \brief Close the connection and release memory
194  * \param con connection structure
195  */
196 void db_cassa_free_connection(struct pool_con* con)
197 {
198         struct cassa_con * _c;
199
200         if (!con) return;
201
202         _c = (struct cassa_con*) con;
203         dbcassa_close(_c->con);
204         pkg_free(_c);
205 }
206
207 /*!
208  * \brief Reconnect to Cassandra cluster
209  * \param con connection structure
210  */
211 void dbcassa_reconnect(struct cassa_con* con)
212 {
213         dbcassa_close(con->con);
214         con->con = dbcassa_open(con->id);
215 }
216
217
218 /*
219  * ----              DB Operations Section                          ----
220  * */
221
222 /*
223  *      Util functions
224  * */
225 static int cassa_get_res_col(std::vector<oac::ColumnOrSuperColumn> result, int r_si, int r_fi, int prefix_len, db_key_t qcol)
226 {
227         str res_col_name;
228
229         for (int i = r_si; i< r_fi; i++) {
230                 res_col_name.s = (char*)result[i].column.name.c_str()+prefix_len;
231                 res_col_name.len = (int)result[i].column.name.size() - prefix_len;
232
233                 if(res_col_name.len == qcol->len &&
234                                 strncmp(res_col_name.s, qcol->s, qcol->len )==0)
235                         return i;
236         }
237         return -1;
238 }
239
240 static int cassa_convert_result(db_key_t qcol, std::vector<oac::ColumnOrSuperColumn> result,
241                 int r_si, int r_fi, int prefix_len, db_val_t* sr_cell)
242 {
243         str col_val;
244         int idx_rescol;
245         oac::Column res_col;
246
247         idx_rescol = cassa_get_res_col(result, r_si, r_fi, prefix_len, qcol);
248         if(idx_rescol< 0) {
249                 LM_DBG("Column not found in result %.*s\n", qcol->len, qcol->s);
250                 sr_cell->nul  = 1;
251                 return 0;
252         }
253         res_col = result[idx_rescol].column;
254
255         col_val.s = (char*)res_col.value.c_str();
256
257         if(!col_val.s) {
258                 LM_DBG("Column not found in result %.*s- NULL\n", qcol->len, qcol->s);
259                 sr_cell->nul  = 1;
260                 return 0;
261         }
262         col_val.len = strlen(col_val.s);
263
264         sr_cell->nul  = 0;
265         sr_cell->free  = 0;
266
267         switch (sr_cell->type) {
268                 case DB1_INT:
269                         if(str2int(&col_val, (unsigned int*)&sr_cell->val.int_val) < 0) {
270                                 LM_ERR("Wrong value [%s] - len=%d, expected integer\n", col_val.s, col_val.len);
271                                 return -1;
272                         }
273                         break;
274                 case DB1_BIGINT:
275                         if(sscanf(col_val.s, "%lld", &sr_cell->val.ll_val) < 0) {
276                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
277                                 return -1;
278                         }
279                         break;
280                 case DB1_DOUBLE:
281                         if(sscanf(col_val.s, "%lf", &sr_cell->val.double_val) < 0) {
282                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
283                                 return -1;
284                         }
285                         break;
286                 case DB1_STR:
287                         pkg_str_dup(&sr_cell->val.str_val, &col_val);
288                         sr_cell->free  = 1;
289                         break;
290                 case DB1_STRING:
291                         col_val.len++;
292                         pkg_str_dup(&sr_cell->val.str_val, &col_val);
293                         sr_cell->val.str_val.len--;
294                         sr_cell->val.str_val.s[col_val.len-1]='\0';
295                         sr_cell->free  = 1;
296                         break;
297                 case DB1_BLOB:
298                         pkg_str_dup(&sr_cell->val.blob_val, &col_val);
299                         sr_cell->free  = 1;
300                         break;
301                 case DB1_BITMAP:
302                         if(str2int(&col_val, &sr_cell->val.bitmap_val) < 0) {
303                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
304                                 return -1;
305                         }
306                         break;
307                 case DB1_DATETIME:
308                         if(sscanf(col_val.s, "%ld", (long int*)&sr_cell->val.time_val) < 0) {
309                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
310                                 return -1;
311                         }
312                         break;
313                 default:
314                         LM_ERR("unknown data type\n");
315                         return -1;
316         }
317         return 0;
318 }
319
320 static char* dbval_to_string(db_val_t dbval, char* pk)
321 {
322         switch(dbval.type) {
323                 case DB1_STRING: strcpy(pk, dbval.val.string_val);
324                                                    pk+= strlen(dbval.val.string_val);
325                                                    break;
326                 case DB1_STR:    memcpy(pk, dbval.val.str_val.s, dbval.val.str_val.len);
327                                                    pk+= dbval.val.str_val.len;
328                                                    break;
329                 case DB1_INT:    pk+= sprintf(pk, "%d", dbval.val.int_val);
330                                                    break;
331                 case DB1_BIGINT: pk+= sprintf(pk, "%lld", dbval.val.ll_val);
332                                                    break;
333                 case DB1_DOUBLE: pk+= sprintf(pk, "%lf", dbval.val.double_val);
334                                                    break;
335                 case DB1_BLOB:   pk+= sprintf(pk, "%.*s", dbval.val.blob_val.len, dbval.val.blob_val.s);
336                                                    break;
337                 case DB1_BITMAP: pk+= sprintf(pk, "%u", dbval.val.bitmap_val);
338                                                    break;
339                 case DB1_DATETIME:pk+= sprintf(pk, "%ld", (long int)dbval.val.time_val);
340                                                   break;
341                 default: LM_ERR("unknown data type\n");
342         }
343         return pk;
344 }
345
346
347 int cassa_constr_key( const db_key_t* _k, const db_val_t* _v,
348                 int _n, int key_len, dbcassa_column_p* key_array, int *no_kc, char* key)
349 {
350         int i, j;
351         char* pk = key;
352
353         if(!key_array)
354                 return 0;
355
356         for(j = 0; j< _n; j++) {
357                 LM_DBG("query col = %.*s\n",  _k[j]->len,  _k[j]->s);
358         }
359
360         for(i = 0; i< key_len; i++) {
361                 /* look in the received columns to search the key column */
362                 for(j = 0; j< _n; j++) {
363                         if(_k[j]->len == key_array[i]->name.len &&
364                                         !strncmp(_k[j]->s, key_array[i]->name.s, _k[j]->len))
365                                 break;
366                 }
367                 if(j == _n) {
368                         LM_DBG("The key column with name [%.*s] not found in values\n", key_array[i]->name.len, key_array[i]->name.s);
369                         break;
370                 }
371                 pk= dbval_to_string(_v[j], pk);
372                 *(pk++) = cassa_key_delim;
373         }
374         if(pk > key)
375                 *(--pk) = '\0';
376         else
377                 *key = '\0';
378
379         if(no_kc)
380                 *no_kc = i;
381
382         LM_DBG("key = %s\n", key);
383
384         return pk - key;
385 }
386
387
388 int cassa_result_separate_rows(std::vector<oac::ColumnOrSuperColumn> result) {
389         int rows_no =0, i = 0;
390         int res_size = result.size();
391
392         while(i< res_size) {
393                 size_t found;
394                 std::string curr_seckey;
395
396                 found = result[i].column.name.find(cassa_key_delim);
397                 if(found< 0) {
398                         LM_ERR("Wrong formated column name - secondary key part not found [%s]\n",
399                                         result[i].column.name.c_str());
400                         return -1;
401                 }
402                 curr_seckey = result[i].column.name.substr(0, found);
403
404                 while(++i < res_size) {
405                         if(result[i].column.name.compare(0, found, curr_seckey)) {
406                                 LM_DBG("Encountered a new secondary key %s - %s\n", result[i].column.name.c_str(), curr_seckey.c_str());
407                                 break;
408                         }
409                 }
410                 /* the current row stretches until index 'i' and the corresponding key prefix has length 'found' */
411                 row_slices[rows_no][0] = i;
412                 row_slices[rows_no][1] = found +1;
413                 rows_no++;
414         }
415
416         /* debug messages */
417         for(int i = 0; i< rows_no; i++) {
418                 LM_DBG("Row %d until index %d with prefix len %d\n", i, row_slices[i][0], row_slices[i][1]);
419         }
420
421         return rows_no;
422 }
423
424 dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
425 {
426         dbcassa_column_p colp;
427
428         colp = tbc->cols;
429         while(colp) {
430                 if(colp->name.len == col_name->len && !strncmp(colp->name.s, col_name->s, col_name->len))
431                         return colp;
432                 colp = colp->next;
433         }
434         return 0;
435 }
436
437 typedef std::vector<oac::ColumnOrSuperColumn>  ColumnVec;
438 typedef std::auto_ptr<ColumnVec>  ColumnVecPtr;
439
440 ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
441                 const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
442 {
443         char row_key[cassa_max_key_len];
444         char sec_key[cassa_max_key_len];
445         int key_len=0, seckey_len = 0;
446         int no_kc, no_sec_kc;
447         dbcassa_table_p tbc;
448         char pk[256];
449
450         /** Lock table schema and construct primary and secondary key **/
451         if(_k) {
452                 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
453                 if(!tbc) {
454                         LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
455                         return ColumnVecPtr(NULL);
456                 }
457                 cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
458
459                 if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
460                         LM_ERR("Query not supported - key not provided\n");
461                         dbcassa_lock_release(tbc);
462                         return ColumnVecPtr(NULL);
463                 }
464                 key_len = tbc->key_len;
465
466                 cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
467                 seckey_len = tbc->seckey_len;
468
469                 dbcassa_lock_release(tbc);
470         }
471
472         try {
473                 oac::SlicePredicate sp;
474                 if(seckey_len) { // seckey defined for this table
475                         if(no_sec_kc == seckey_len) { // was able to build the complete secondary key
476                                 if(_c) { /* if queried for specific columns */
477                                         /* query for the specific columns */
478                                         for(int i=0; i< _nc; i++) {
479                                                 std::string col_name = sec_key;
480                                                 col_name.push_back(cassa_key_delim);
481                                                 col_name.append(_c[i]->s);
482                                                 sp.column_names.push_back(col_name);
483                                                 LM_DBG("Query col: %s\n", col_name.c_str());
484                                         }
485                                         sp.__isset.column_names = true; // set
486                                 } else { /* query for columns starting with this secondary key */
487                                         oac::SliceRange sr;
488                                         sr.start = sec_key;
489                                         sr.start.push_back(cassa_key_delim);
490                                         sr.finish = sec_key;
491                                         sr.finish.push_back(cassa_key_delim +1);
492                                         sp.slice_range = sr;
493                                         sp.__isset.slice_range = true; // set
494                                 }
495                         } else {  /* query all columns */
496                                 oac::SliceRange sr;
497                                 sr.start = "";
498                                 sr.finish = "";
499                                 sp.slice_range = sr;
500                                 sp.__isset.slice_range = true; // set
501                         }
502                 } else { /* the table doesn't have any secondary key defined */
503                         if(_c) {
504                                 for(int i=0; i< _nc; i++) {
505                                         /*sp.column_names.push_back(_c[i]->s);*/
506                                         if(_c[i]->len>255) {
507                                                 LM_ERR("column key is too long [%.*s]\n", _c[i]->len, _c[i]->s);
508                                                 return ColumnVecPtr(NULL);
509                                         }
510                                         memcpy(pk, _c[i]->s, _c[i]->len);
511                                         pk[_c[i]->len] = '\0';
512                                         sp.column_names.push_back(pk);
513                                         LM_DBG("Query col: %s\n", _c[i]->s);
514                                 }
515                                 LM_DBG("get %d columns\n", _nc);
516                                 sp.__isset.column_names = true; // set
517                         } else {
518                                 /* return all columns */
519                                 oac::SliceRange sr;
520                                 sr.start = "";
521                                 sr.finish = "";
522                                 sp.slice_range = sr;
523                                 sp.__isset.slice_range = true; // set
524                                 LM_DBG("get all columns\n");
525                         }
526                 }
527
528                 unsigned int retr = 0;
529                 oac::ColumnParent cparent;
530                 cparent.column_family = _h->table->s;
531                 ColumnVecPtr cassa_result(new std::vector<oac::ColumnOrSuperColumn>);
532                 do {
533                         if(CON_CASSA(_h)->con) {
534                                 try {
535
536                                         if(_k) {
537                                                 CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
538                                                 *ret_rows_no = 1;
539                                         } else {
540                                                 oac::KeyRange keyRange;
541                                                 keyRange.start_key = "";
542                                                 keyRange.start_key = "";
543                                                 std::vector<oac::KeySlice> key_slice_vect;
544                                                 keyRange.__isset.start_key = 1;
545                                                 keyRange.__isset.end_key = 1;
546                                                 ColumnVec::iterator it = cassa_result->begin();
547
548                                                 /* get in a loop 100 records at a time */
549                                                 int rows_no =0;
550                                                 while(1) {
551                                                         CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
552                                                         /* construct cassa_result */
553                                                         LM_DBG("Retuned %d key slices\n", (int)key_slice_vect.size());
554                                                         for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
555                                                                 if(key_slice_vect[i].columns.size()==0) {
556                                                                         continue;
557                                                                 }
558                                                                 cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
559                                                                 it = cassa_result->begin();
560                                                                 row_slices[rows_no][0] = cassa_result->size();
561                                                                 row_slices[rows_no][1] = 0;
562                                                                 rows_no++;
563                                                         }
564                                                         if(key_slice_vect.size() < (unsigned int)keyRange.count)
565                                                                 break;
566                                                 }
567
568                                                 *ret_rows_no = rows_no;
569                                         }
570
571                                         return cassa_result;
572                                 } catch (const att::TTransportException &tx) {
573                                         LM_ERR("Failed to query: %s\n", tx.what());
574                                 }
575                         }
576                         dbcassa_reconnect(CON_CASSA(_h));
577                 } while(cassa_auto_reconnect && retr++ < cassa_retries);
578                 LM_ERR("Failed to connect, retries exceeded.\n");
579         } catch (const oac::InvalidRequestException ir) {
580                 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
581         } catch (const at::TException &tx) {
582                 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
583         } catch (const std::exception &ex) {
584                 LM_ERR("Failed std error: %s\n", ex.what());
585         } catch (...) {
586                 LM_ERR("Failed generic error\n");
587         }
588
589         LM_DBG("Query with get slice no_kc=%d tbc->key_len=%d  _n=%d\n", no_kc, key_len,_n);
590         return ColumnVecPtr(NULL);
591 }
592
593
594 /** 
595  *  This function check the CQLresult of the CQL query and   
596  *  adds the columns to the returning result structure. 
597  *
598  * \param _cql_res  handle for the CQLResult
599  * \param _r result set for storage
600  * \param tbc cassandra database table
601  * \return zero on success, negative value on failure
602  */
603 int cql_get_columns(oac::CqlResult& _cql_res, db1_res_t* _r, dbcassa_table_p tbc)
604 {
605         std::vector<oac::CqlRow>  res_cql_rows = _cql_res.rows;
606         int rows_no = res_cql_rows.size();
607         int cols_no = 0;
608
609         LM_DBG("cqlrow Vector size =%d\n", rows_no);
610         
611         if (rows_no > 0) {
612                 cols_no = res_cql_rows[0].columns.size();
613                 LM_DBG("There are %d columns available, this should be the case for all %d rows (consider cql).\n", cols_no, rows_no);
614         } else {
615                 LM_DBG("Got 0 rows. There is no result from the query.\n");
616                 return 0;
617         }
618
619         RES_COL_N(_r) = cols_no;
620         if (!RES_COL_N(_r)) {
621                 LM_ERR("no columns returned from the query\n");
622                 return -2;
623         } else {
624                 LM_DBG("%d columns returned from the query\n", RES_COL_N(_r));
625         }
626
627         if (db_allocate_columns(_r, RES_COL_N(_r)) != 0) {
628                 LM_ERR("Could not allocate columns\n");
629                 return -3;
630         }
631
632         /* For fields we will use the columns inside the first columns */
633
634         for(int col = 0; col < RES_COL_N(_r); col++) {
635                 RES_NAMES(_r)[col] = (str*)pkg_malloc(sizeof(str));
636                 if (! RES_NAMES(_r)[col]) {
637                         LM_ERR("no private memory left\n");
638                         RES_COL_N(_r) = col;
639                         db_free_columns(_r);
640                         return -4;
641                 }
642                 LM_DBG("Allocated %lu bytes for RES_NAMES[%d] at %p\n",
643                         (unsigned long)sizeof(str), col, RES_NAMES(_r)[col]);
644
645                 /* The pointer that is here returned is part of the result structure. */
646                 RES_NAMES(_r)[col]->s = (char*) res_cql_rows[0].columns[col].name.c_str();
647                 RES_NAMES(_r)[col]->len = strlen(RES_NAMES(_r)[col]->s);
648
649                 /* search the column in table schema to get the type */
650                 dbcassa_column_p colp = cassa_search_col(tbc, (db_key_t) RES_NAMES(_r)[col]);
651                 if(!colp) {
652                         LM_ERR("No column with name [%.*s] found\n", RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s);
653                         RES_COL_N(_r) = col;
654                         db_free_columns(_r);
655                         return -4;
656                 }
657
658                 RES_TYPES(_r)[col] = colp->type;
659
660                 LM_DBG("Column with name [%.*s] found: %d\n", RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s, colp->type);
661                 LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(_r)[col], col,
662                         RES_NAMES(_r)[col]->len, RES_NAMES(_r)[col]->s);
663         }
664         return 0;
665 }
666
667 static int cassa_convert_result_raw(db_val_t* sr_cell, str *col_val) {
668
669         if(!col_val->s) {
670                 LM_DBG("Column not found in result - NULL\n");
671                 sr_cell->nul  = 1;
672                 return 0;
673         }
674         col_val->len = strlen(col_val->s);
675
676         sr_cell->nul  = 0;
677         sr_cell->free  = 0;
678
679         switch (sr_cell->type) {
680                 case DB1_INT:
681                         if(str2int(col_val, (unsigned int*)&sr_cell->val.int_val) < 0) {
682                                 LM_ERR("Wrong value [%s] - len=%d, expected integer\n", col_val->s, col_val->len);
683                                 return -1;
684                         }
685                         break;
686                 case DB1_BIGINT:
687                         if(sscanf(col_val->s, "%lld", &sr_cell->val.ll_val) < 0) {
688                                 LM_ERR("Wrong value [%s], expected integer\n", col_val->s);
689                                 return -1;
690                         }
691                         break;
692                 case DB1_DOUBLE:
693                         if(sscanf(col_val->s, "%lf", &sr_cell->val.double_val) < 0) {
694                                 LM_ERR("Wrong value [%s], expected integer\n", col_val->s);
695                                 return -1;
696                         }
697                         break;
698                 case DB1_STR:
699                         pkg_str_dup(&sr_cell->val.str_val, col_val);
700                         sr_cell->free  = 1;
701                         break;
702                 case DB1_STRING:
703                         col_val->len++;
704                         pkg_str_dup(&sr_cell->val.str_val, col_val);
705                         sr_cell->val.str_val.len--;
706                         sr_cell->val.str_val.s[col_val->len-1]='\0';
707                         sr_cell->free  = 1;
708                         break;
709                 case DB1_BLOB:
710                         pkg_str_dup(&sr_cell->val.blob_val, col_val);
711                         sr_cell->free  = 1;
712                         break;
713                 case DB1_BITMAP:
714                         if(str2int(col_val, &sr_cell->val.bitmap_val) < 0) {
715                                 LM_ERR("Wrong value [%s], expected integer\n", col_val->s);
716                                 return -1;
717                         }
718                         break;
719                 case DB1_DATETIME:
720                         if(sscanf(col_val->s, "%ld", (long int*)&sr_cell->val.time_val) < 0) {
721                                 LM_ERR("Wrong value [%s], expected integer\n", col_val->s);
722                                 return -1;
723                         }
724                         break;
725                 default:
726                         LM_ERR("unknown data type\n");
727                         return -1;
728         }
729         return 0;
730 }
731
732
733 /**
734  *  This function convert the rows returned in CQL query 
735  *  and adds the values to the returning result structure.
736  *
737  * Handle CQLresult
738  * \param _cql_res  handle for the CQLResult
739  * \param _r result set for storage
740  * \return zero on success, negative value on failure
741  */
742
743 int cql_convert_row(oac::CqlResult& _cql_res, db1_res_t* _r)
744 {
745         std::vector<oac::CqlRow>  res_cql_rows = _cql_res.rows;
746         int rows_no = res_cql_rows.size();
747         int cols_no = res_cql_rows[0].columns.size();
748         str col_val;
749         RES_ROW_N(_r) = rows_no;
750
751         if (db_allocate_rows(_r) < 0) {
752                 LM_ERR("Could not allocate rows.\n");
753                 return -1;
754         }
755
756         for(int ri=0; ri < rows_no; ri++) {
757                 if (db_allocate_row(_r, &(RES_ROWS(_r)[ri])) != 0) {
758                         LM_ERR("Could not allocate row.\n");
759                         return -2;
760                 }
761
762                 // complete the row with the columns 
763                 for(int col = 0; col< cols_no; col++) {
764                         col_val.s = (char*)res_cql_rows[ri].columns[col].value.c_str();
765                         col_val.len = strlen(col_val.s);
766
767                         RES_ROWS(_r)[ri].values[col].type = RES_TYPES(_r)[col]; 
768                         cassa_convert_result_raw(&RES_ROWS(_r)[ri].values[col], &col_val);
769
770                         LM_DBG("Field index %d. %s = %s.\n", col,
771                                 res_cql_rows[ri].columns[col].name.c_str(),
772                                 res_cql_rows[ri].columns[col].value.c_str());
773                 }
774         }
775         return 0;
776
777
778 /*
779  *      The functions for the DB Operations: query, delete, update.
780  * */
781
782 /*
783  * Extracts table name from DML query being used
784  *
785  * */
786 static int get_table_from_query(const str *cql, str *table) {
787
788         char *ptr = cql->s,
789                 *begin = NULL;
790
791         if (cql->s[0] == 's' || cql->s[0] == 'S') {
792                 ptr = strcasestr(cql->s, "from");
793                 ptr += sizeof(char) * 4;
794         }
795         else if (cql->s[0] == 'u' || cql->s[0] == 'U') {
796                 ptr = cql->s + sizeof("update") - 1;
797         }
798         else if (cql->s[0] == 'd' || cql->s[0] == 'D') {
799                 ptr = strcasestr(cql->s, "from");
800                 ptr += sizeof(char) * 4;
801         }
802         else if (cql->s[0] == 'i' || cql->s[0] == 'I') {
803                 ptr = strcasestr(cql->s, "into");
804                 ptr += sizeof(char) * 4;
805         }
806         else 
807                 goto error;
808
809         while (*ptr == ' ' && (ptr - cql->s) <= cql->len) {
810                 ptr++;
811         }
812
813         begin = ptr;
814         ptr   = strchr(begin, ' ');
815
816         if (ptr == NULL)
817                 ptr = cql->s + cql->len;
818         
819         if (ptr - begin <= 0)
820                 goto error;
821         
822         table->s = begin;
823         table->len = ptr - begin;
824
825         return 0;
826
827 error:
828         LM_ERR("Unable to determine operation in cql [%*s]\n", cql->len, cql->s);
829         return -1;
830 }
831
832 /**
833  * Execute a raw SQL query.
834  * \param _h handle for the database
835  * \param _s raw query string
836  * \param _r result set for storage
837  * \return zero on success, negative value on failure
838  */
839 int db_cassa_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
840 {
841         db1_res_t* db_res = 0;
842         str table_name;
843         dbcassa_table_p tbc;
844         std::vector<oac::CqlRow>  res_cql_rows;
845
846         if (!_h || !_r) {
847                 LM_ERR("Invalid parameter value\n");
848                 return -1;
849         }
850         
851         if (get_table_from_query(_s, &table_name) < 0) { 
852                 LM_ERR("Error parsing table name in CQL string");
853                 return -1;
854         }
855
856         LM_DBG("query table=%.*s\n", table_name.len, table_name.s);
857         LM_DBG("CQL=%s\n", _s->s);
858
859         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, &table_name);
860         if(!tbc) {
861                 LM_ERR("table %.*s does not exist!\n", table_name.len, table_name.s);
862                 return -1;
863         }
864
865         std::string cql_query(_s->s);
866
867         oac::CqlResult cassa_cql_res;
868
869         try {
870                 CON_CASSA(_h)->con->execute_cql_query(cassa_cql_res, cql_query , oac::Compression::NONE);
871         } catch (const oac::InvalidRequestException &irx) {
872                 LM_ERR("Invalid Request caused error details: %s.\n", irx.why.c_str());
873         } catch (const at::TException &tx) {
874                 LM_ERR("T Exception %s\n", tx.what());
875         } catch (const std::exception &ex) {
876                 LM_ERR("Failed: %s\n", ex.what());
877         } catch (...) {
878                 LM_ERR("Failed to open connection to Cassandra cluster\n");
879         }
880
881         if (!cassa_cql_res.__isset.rows) {
882                 LM_ERR("The resultype rows was not set, no point trying to parse result.\n");
883                 goto error;
884         }
885
886         res_cql_rows = cassa_cql_res.rows;
887
888         /* TODO Handle the other types */
889         switch(cassa_cql_res.type) {
890                 case 1:  LM_DBG("Result set is an ROW Type.\n");
891                         break;
892                 case 2: LM_DBG("Result set is an VOID Type.\n");
893                         break;
894                 case 3: LM_DBG("Result set is an INT Type.\n");
895                         break;
896         }
897
898         db_res = db_new_result();
899         if (!db_res) {
900                 LM_ERR("no memory left\n");
901                 goto error;
902         }
903
904         if(res_cql_rows.size() == 0) {
905                 LM_DBG("The query returned no result\n");
906                 RES_ROW_N(db_res) = 0;
907                 RES_COL_N(db_res)= 0;
908                 *_r = db_res;
909                 goto done;
910         }
911
912         if (cql_get_columns(cassa_cql_res, db_res, tbc) < 0) {
913                 LM_ERR("Error getting column names.");
914                 goto error;
915         }
916
917         if (cql_convert_row(cassa_cql_res, db_res) < 0) {
918                 LM_ERR("Error converting rows");
919                 goto error;
920         }
921
922         *_r = db_res;
923 done:
924         dbcassa_lock_release(tbc);
925
926         LM_DBG("Exited with success\n");
927         return 0;
928
929 error:
930         if(db_res)
931                 db_free_result(db_res);
932         
933         dbcassa_lock_release(tbc);
934         return -1;
935 }
936
937
938
939 /*
940  * Query table for specified rows
941  * _h: structure representing database connection
942  * _k: key names
943  * _op: operators
944  * _v: values of the keys that must match
945  * _c: column names to return
946  * _n: number of key=values pairs to compare
947  * _nc: number of columns to return
948  * _o: order by the specified column
949  */
950 int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
951                 const db_val_t* _v, const db_key_t* _c, int _n, int _nc,
952                 const db_key_t _o, db1_res_t** _r)
953 {
954         db1_res_t* db_res = 0;
955         int rows_no;
956         ColumnVecPtr cassa_result;
957         dbcassa_table_p tbc;
958         int seckey_len;
959
960         if (!_h || !CON_TABLE(_h) || !_r) {
961                 LM_ERR("invalid parameter value\n");
962                 return -1;
963         }
964         LM_DBG("query table=%s\n", _h->table->s);
965
966         /** Construct and send the query to Cassandra Cluster **/
967
968         cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
969
970         if(cassa_result.get() == NULL) {
971                 LM_ERR("Failed to query Cassandra cluster\n");
972                 return -1;
973         }
974
975         /* compare the number of queried cols with the key cols*/
976 //      if(no_kc + no_sec_kc < _n) { /* TODO */
977                 /* filter manually for the rest of the values */
978 //      }
979
980         db_res = db_new_result();
981         if (!db_res) {
982                 LM_ERR("no memory left\n");
983                 goto error;
984         }
985         RES_COL_N(db_res)= _nc;
986         if(!db_allocate_columns(db_res, _nc) < 0) {
987                 LM_ERR("no more memory\n");
988                 goto error;
989         }
990
991         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
992         if(!tbc) {
993                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
994                 return -1;
995         }
996
997         /** Convert the result from Cassandra **/
998         /* fill in the columns name and type */
999         for(int col = 0; col < _nc; col++) {
1000                 RES_NAMES(db_res)[col] = (str*)pkg_malloc(sizeof(str));
1001                 if (! RES_NAMES(db_res)[col]) {
1002                         LM_ERR("no private memory left\n");
1003                         dbcassa_lock_release(tbc);
1004                         RES_COL_N(db_res) = col;
1005                         db_free_columns(db_res);
1006                         goto error;
1007                 }
1008
1009                 *RES_NAMES(db_res)[col]   = *_c[col];
1010
1011                 /* search the column in table schema to get the type */
1012                 dbcassa_column_p colp = cassa_search_col(tbc, _c[col]);
1013                 if(!colp) {
1014                         LM_ERR("No column with name [%.*s] found\n", _c[col]->len, _c[col]->s);
1015                         dbcassa_lock_release(tbc);
1016                         RES_COL_N(db_res) = col;
1017                         db_free_columns(db_res);
1018                         goto error;
1019                 }
1020                 RES_TYPES(db_res)[col] = colp->type;
1021
1022                 LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
1023                                 RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
1024         }
1025         /* TODO  if all columns asked - take from table schema */
1026         seckey_len = tbc->seckey_len;
1027         dbcassa_lock_release(tbc);
1028
1029         if(!cassa_result->size()) {
1030                 LM_DBG("The query returned no result\n");
1031                 RES_ROW_N(db_res) = 0;
1032                 goto done;
1033         }
1034
1035         /* Initialize the row_slices vector for the case with one column and no secondary key */
1036         if(rows_no == 1) {
1037                 row_slices[0][0]= cassa_result->size();
1038                 row_slices[0][1]= 0;
1039
1040                 if(seckey_len) { /* if the table has a secondary key defined */
1041                         /* pass through the result once to see how many rows there are */
1042                         rows_no = cassa_result_separate_rows(*cassa_result);
1043                         if(rows_no < 0) {
1044                                 LM_ERR("Wrong formated column names\n");
1045                                 goto error;
1046                         }
1047                 }
1048         }
1049
1050         RES_ROW_N(db_res) = rows_no;
1051
1052         if (db_allocate_rows(db_res) < 0) {
1053                 LM_ERR("could not allocate rows");
1054                 goto error;
1055         }
1056
1057         for(int ri=0; ri < rows_no; ri++) {
1058                 if (db_allocate_row(db_res, &(RES_ROWS(db_res)[ri])) != 0) {
1059                         LM_ERR("could not allocate row");
1060                         goto error;
1061                 }
1062
1063                 /* complete the row with the columns */
1064                 for(int col = 0; col< _nc; col++) {
1065                         RES_ROWS(db_res)[ri].values[col].type = RES_TYPES(db_res)[col];
1066                         cassa_convert_result(_c[col], *cassa_result, (ri>0?row_slices[ri-1][0]:0),  row_slices[ri][0],
1067                                         row_slices[ri][1], &RES_ROWS(db_res)[ri].values[col]);
1068                 }
1069         }
1070
1071 done:
1072         *_r = db_res;
1073         LM_DBG("Exited with success\n");
1074         return 0;
1075
1076 error:
1077         if(db_res)
1078                 db_free_result(db_res);
1079         return -1;
1080 }
1081
1082 /*
1083  * Insert or update the table for specified row key
1084  * _h: structure representing database connection
1085  * _k: key names
1086  * _op: operators
1087  * _v: values of the keys that must match
1088  * _uk: column names to update
1089  * _uv: values for the columns to update
1090  * _n: number of key=values pairs to compare
1091  * _un: number of columns to update
1092  */
1093 int db_cassa_modify(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
1094                 const db_key_t* _uk, const db_val_t* _uv, int _n, int _un)
1095 {
1096         dbcassa_table_p tbc;
1097         char row_key[cassa_max_key_len];
1098         char sec_key[cassa_max_key_len];
1099         int64_t ts = 0;
1100         str ts_col_name={0, 0};
1101         int seckey_len;
1102         unsigned int curr_time = time(NULL);
1103
1104         if (!_h || !CON_TABLE(_h) || !_k || !_v) {
1105                 LM_ERR("invalid parameter value\n");
1106                 return -1;
1107         }
1108
1109         LM_DBG("modify table=%s\n", _h->table->s);
1110
1111         /** Lock table schema and construct primary and secondary key **/
1112         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
1113         if(!tbc) {
1114                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
1115                 return -1;
1116         }
1117         if(tbc->ts_col)
1118                 pkg_str_dup(&ts_col_name, (const str*)&tbc->ts_col->name);
1119
1120         cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, 0, row_key);
1121         cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, 0, sec_key);
1122         seckey_len = tbc->seckey_len;
1123
1124         dbcassa_lock_release(tbc);
1125
1126         /** Construct and send the query to Cassandra Cluster **/
1127         try {
1128                 /* Set the columns */
1129                 std::vector<oac::Mutation> mutations;
1130                 for(int i=0; i< _un; i++) {
1131                         if(_uv[i].nul)
1132                                 continue;
1133
1134                         std::stringstream out;
1135                         std::string value;
1136                         int cont = 0;
1137
1138                         switch(_uv[i].type) {
1139                                 case DB1_INT:   out << _uv[i].val.int_val;
1140                                                                 value = out.str();
1141                                                                 break;
1142                                 case DB1_BIGINT:out << _uv[i].val.ll_val;
1143                                                                 value = out.str();
1144                                                                 break;
1145                                 case DB1_DOUBLE:out << _uv[i].val.double_val;
1146                                                                 value = out.str();
1147                                                                 break;
1148                                 case DB1_BITMAP:out << _uv[i].val.bitmap_val;
1149                                                                 value = out.str();
1150                                                                 break;
1151                                 case DB1_STRING:value = _uv[i].val.string_val;
1152                                                                 break;
1153                                 case DB1_STR:   if(!_uv[i].val.str_val.s) {
1154                                                                         cont = 1;
1155                                                                         break;
1156                                                                 }
1157                                                                 value = std::string(_uv[i].val.str_val.s, _uv[i].val.str_val.len);
1158                                                                 break;
1159                                 case DB1_BLOB:  value = std::string(_uv[i].val.blob_val.s, _uv[i].val.blob_val.len);
1160                                                                 break;
1161                                 case DB1_DATETIME: { /* own block because we declare a variable here */
1162                                         unsigned int exp_time = (unsigned int)_uv[i].val.time_val;
1163                                                                         out << exp_time;
1164                                                                         value = out.str();
1165                                                                         if(ts_col_name.s && ts_col_name.len==_uk[i]->len &&
1166                                                                                         strncmp(ts_col_name.s, _uk[i]->s, ts_col_name.len)==0) {
1167                                                                                 ts = exp_time;
1168                                                                                 LM_DBG("Found timestamp col [%.*s]\n", ts_col_name.len, ts_col_name.s);
1169                                                                         }
1170                                 } break;
1171                                 case DB1_UNKNOWN:
1172                                                                         LM_ERR("unknown data type\n");
1173                                                                         /* needs probably more errors handling, free at least the memory */
1174                                                                         if(ts_col_name.s)
1175                                                                                 pkg_free(ts_col_name.s);
1176                                                                         ts_col_name.s = 0;
1177                                                                         return -1;
1178                         }
1179                         if (cont)
1180                                 continue;
1181
1182                         LM_DBG("ADDED column [%.*s] type [%d], value [%s]\n", _uk[i]->len, _uk[i]->s,
1183                                 _uv[i].type, value.c_str());
1184
1185                         oac::Mutation mut;
1186                         oac::ColumnOrSuperColumn col;
1187                         if(seckey_len) {
1188                                 col.column.name = sec_key;
1189                                 col.column.name.push_back(cassa_key_delim);
1190                                 col.column.name.append(_uk[i]->s);
1191                         }
1192                         else
1193                                 col.column.name = _uk[i]->s;
1194                         col.column.value = value;
1195                         col.column.__isset.value = true;
1196                         col.__isset.column = true;
1197                         col.column.timestamp = curr_time;
1198                         col.column.__isset.timestamp = true;
1199                         mut.column_or_supercolumn = col;
1200                         mut.__isset.column_or_supercolumn = true;
1201                         mutations.push_back(mut);
1202                 }
1203                 if(ts_col_name.s)
1204                         pkg_free(ts_col_name.s);
1205                 ts_col_name.s = 0;
1206
1207                 if(ts) {
1208                         int32_t ttl = ts - curr_time;
1209                         LM_DBG("Set expires to %d seconds\n", ttl);
1210                         for(size_t mi=0; mi< mutations.size(); mi++) {
1211                                 mutations[mi].column_or_supercolumn.column.ttl = ttl;
1212                                 mutations[mi].column_or_supercolumn.column.__isset.ttl = true;
1213                         }
1214                 }
1215
1216                 LM_DBG("Perform the mutation, add [%d] columns\n", (int)mutations.size());
1217
1218                 std::map<std::string, std::vector<oac::Mutation> > innerMap;
1219                 innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
1220                 std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
1221                 CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
1222                 unsigned int retr = 0;
1223
1224                 do {
1225                         if(CON_CASSA(_h)->con) {
1226                                 try{
1227                                         CON_CASSA(_h)->con->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
1228                                         return 0;
1229                                 }  catch (const att::TTransportException &tx) {
1230                                         LM_ERR("Failed to query: %s\n", tx.what());
1231                                 }
1232                         }
1233                         dbcassa_reconnect(CON_CASSA(_h));
1234                 } while (cassa_auto_reconnect && retr++ < cassa_retries);
1235                 LM_ERR("Failed to connect, retries exceeded.\n");
1236         } catch (const oac::InvalidRequestException ir) {
1237                 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
1238         } catch (const at::TException &tx) {
1239                 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
1240         } catch (const std::exception &ex) {
1241                 LM_ERR("Failed std error: %s\n", ex.what());
1242         } catch (...) {
1243                 LM_ERR("Failed generic error\n");
1244         }
1245
1246         LM_ERR("Insert/Update query failed\n");
1247         return -1;
1248 }
1249
1250 int db_cassa_replace(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
1251                 int _n, const int _un, const int _m)
1252 {
1253         LM_DBG("db_cassa_replace:\n");
1254         return db_cassa_modify(_h, _k, _v, _k, _v, _n, _n);
1255 }
1256
1257 int db_cassa_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
1258                 int _n)
1259 {
1260         LM_DBG("db_cassa_insert:\n");
1261         return db_cassa_modify(_h, _k, _v, _k, _v, _n, _n);
1262 }
1263
1264
1265 int db_cassa_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
1266                 const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv,
1267                 int _n, int _un)
1268 {
1269         LM_DBG("db_cassa_update:\n");
1270         return db_cassa_modify(_h, _k, _v, _uk, _uv, _n, _un);
1271 }
1272
1273
1274 int db_cassa_free_result(db1_con_t* _h, db1_res_t* _r)
1275 {
1276         return db_free_result(_r);
1277 }
1278
1279 /*
1280  * Delete after primary or primary and secondary key
1281  * _h: structure representing database connection
1282  * _k: key names
1283  * _op: operators
1284  * _v: values of the keys that must match
1285  * _n: number of key=values pairs to compare
1286  */
1287 int db_cassa_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
1288                 const db_val_t* _v, int _n)
1289 {
1290         oac::CassandraClient* cassa_client = CON_CASSA(_h)->con;
1291         char row_key[cassa_max_key_len];
1292         char sec_key[cassa_max_key_len];
1293         dbcassa_table_p tbc;
1294         int no_kc, no_sec_kc;
1295         unsigned int retr = 0;
1296         int seckey_len;
1297         oac::Mutation m;
1298
1299         if (!_h || !CON_TABLE(_h) || !_k || !_v) {
1300                 LM_ERR("invalid parameter value\n");
1301                 return -1;
1302         }
1303
1304         LM_DBG("query table=%s\n", _h->table->s);
1305
1306         /* get the table schema and construct primary and secondary key */
1307         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
1308         if(!tbc)
1309         {
1310                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
1311                 return -1;
1312         }
1313
1314         cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc,   row_key);
1315         cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
1316         seckey_len = tbc->seckey_len;
1317
1318         if (_n != no_kc && no_sec_kc == seckey_len) {
1319                 /* if the conditions are also for secondary key */
1320                 LM_DBG("Delete after primary and secondary key %s %s\n", row_key, sec_key);
1321                 dbcassa_column_p colp = tbc->cols;
1322                 try {
1323                         while(colp) {
1324                                 std::string col_name = sec_key;
1325                                 col_name.push_back(cassa_key_delim);
1326                                 col_name.append(colp->name.s);
1327                                 m.deletion.predicate.column_names.push_back(col_name);
1328                                 colp = colp->next;
1329                         }
1330                 } catch (...) {
1331                         LM_ERR("Failed to construct the list of column names\n");
1332                         dbcassa_lock_release(tbc);
1333                         return -1;
1334                 }
1335         }
1336
1337         dbcassa_lock_release(tbc);
1338
1339         for(int i=0; i < _n; i++)
1340                 LM_DBG("delete query col = %.*s\n", _k[i]->len, _k[i]->s);
1341
1342         if(no_kc == 0 ) {
1343                 LM_DBG("Delete operation not supported\n");
1344                 return -1;
1345         }
1346
1347         try {
1348                 if (_n == no_kc) {
1349                         LM_DBG("Delete after row key %s\n", row_key);
1350                         oac::ColumnPath cp;
1351                         cp.column_family = _h->table->s;
1352                         do {
1353                                 if(CON_CASSA(_h)->con) {
1354                                         try {
1355                                                 cassa_client->remove(row_key, cp, (int64_t)time(0), oac::ConsistencyLevel::ONE);
1356                                                 return 0;
1357                                         } catch  (const att::TTransportException &tx) {
1358                                                         LM_ERR("Failed to query: %s\n", tx.what());
1359                                         }
1360                                 }
1361                                 dbcassa_reconnect(CON_CASSA(_h));
1362                         } while(cassa_auto_reconnect && retr++ < cassa_retries);
1363                         LM_ERR("Failed to connect, retries exceeded.\n");
1364                 } else {
1365
1366                         if(!seckey_len) {
1367                                 LM_ERR("Delete operation not supported\n");
1368                                 return -1;
1369                         }
1370
1371 //                      oac::Mutation m;
1372                         m.deletion.timestamp = (int64_t)time(0);
1373                         m.deletion.__isset.timestamp = true;
1374                         m.__isset.deletion = true;
1375
1376 #if 0
1377                         /* push all columns for the corresponding secondary key */
1378                         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
1379                         if(!tbc)
1380                         {
1381                                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
1382                                 return -1;
1383                         }
1384                         dbcassa_column_p colp = tbc->cols;
1385                         try {
1386                                 while(colp) {
1387                                         std::string col_name = sec_key;
1388                                         col_name.push_back(cassa_key_delim);
1389                                         col_name.append(colp->name.s);
1390                                         m.deletion.predicate.column_names.push_back(col_name);
1391                                         colp = colp->next;
1392                                 }
1393                         } catch (...) {
1394                                 LM_ERR("Failed to construct the list of column names\n");
1395                                 dbcassa_lock_release(tbc);
1396                                 return -1;
1397                         }
1398                         dbcassa_lock_release(tbc);
1399 #endif
1400                         m.deletion.__isset.predicate = true;
1401                         m.deletion.predicate.__isset.column_names = true; // set
1402
1403                         std::vector<oac::Mutation> mutations;
1404                         mutations.push_back(m);
1405
1406                         /* innerMap - column_family + mutations vector */
1407                         std::map<std::string, std::vector<oac::Mutation> > innerMap;
1408                         innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
1409                         std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
1410                         CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
1411
1412                         do {
1413                                 if(CON_CASSA(_h)->con) {
1414                                         try {
1415                                                 cassa_client->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
1416                                                 return 0;
1417                                         } catch  (const att::TTransportException &tx) {
1418                                                         LM_ERR("Failed to query: %s\n", tx.what());
1419                                         }
1420                                 }
1421                                 dbcassa_reconnect(CON_CASSA(_h));
1422                         } while(cassa_auto_reconnect && retr++ < cassa_retries);
1423                 }
1424                 LM_ERR("Failed to connect, retries exceeded.\n");
1425         } catch (const oac::InvalidRequestException ir) {
1426                 LM_ERR("Invalid query: %s\n", ir.why.c_str());
1427         } catch (const at::TException &tx) {
1428                 LM_ERR("Failed TException: %s\n", tx.what());
1429         } catch (std::exception &e) {
1430                 LM_ERR("Failed: %s\n", e.what());
1431         } catch (...) {
1432                 LM_ERR("Failed generic error\n");
1433         }
1434
1435         return -1;
1436 }
1437