New Module NSQ
authorEmmanuel Schmidbauer <emmanuel@getweave.com>
Mon, 25 Apr 2016 16:15:27 +0000 (12:15 -0400)
committerEmmanuel Schmidbauer <emmanuel@getweave.com>
Mon, 25 Apr 2016 16:15:27 +0000 (12:15 -0400)
20 files changed:
modules/nsq/Makefile [new file with mode: 0644]
modules/nsq/README.md [new file with mode: 0644]
modules/nsq/defs.h [new file with mode: 0644]
modules/nsq/doc/Makefile [new file with mode: 0644]
modules/nsq/doc/nsq.xml [new file with mode: 0644]
modules/nsq/doc/nsq_admin.xml [new file with mode: 0644]
modules/nsq/http.h [new file with mode: 0644]
modules/nsq/nsq.h [new file with mode: 0644]
modules/nsq/nsq_json.c [new file with mode: 0644]
modules/nsq/nsq_json.h [new file with mode: 0644]
modules/nsq/nsq_mod.c [new file with mode: 0644]
modules/nsq/nsq_mod.h [new file with mode: 0644]
modules/nsq/nsq_pua.c [new file with mode: 0644]
modules/nsq/nsq_pua.h [new file with mode: 0644]
modules/nsq/nsq_reader.c [new file with mode: 0644]
modules/nsq/nsq_reader.h [new file with mode: 0644]
modules/nsq/nsq_trans.c [new file with mode: 0644]
modules/nsq/nsq_trans.h [new file with mode: 0644]
modules/nsq/reader.c [new file with mode: 0644]
modules/nsq/utlist.h [new file with mode: 0644]

diff --git a/modules/nsq/Makefile b/modules/nsq/Makefile
new file mode 100644 (file)
index 0000000..dc685f8
--- /dev/null
@@ -0,0 +1,23 @@
+# $Id: $
+#
+# NSQ
+#
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+
+auto_gen=
+NAME=nsq.so
+
+LIBS=-lnsq -lev -levbuffsock -lcurl -ljson-c
+DEFS+=-I$(LOCALBASE)/include -I/usr/local/include $(shell pkg-config --cflags json-c)
+
+DEFS+=-DKAMAILIO_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1
+SER_LIBS+=$(SERLIBPATH)/kmi/kmi
+SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+
+include ../../Makefile.modules
diff --git a/modules/nsq/README.md b/modules/nsq/README.md
new file mode 100644 (file)
index 0000000..faa6412
--- /dev/null
@@ -0,0 +1,312 @@
+# NSQ Module for Kamailio
+
+## 1. Overview
+
+NSQ is a realtime distributed messaging platform designed to operate at scale, handling billions of messages per day.
+It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.
+
+From a high-level, the purpose of the module might be for things like:
+
+* Integrate to an application to make real-time routing decisions (instead of using, say, a SQL database)
+* Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
+* Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
+
+Supported operations are:
+
+* publish json payloads to nsq topics
+* publish json payloads to nsq topics and wait for correlated response message
+* subscribe to an nsq topic and channel and handle events from that channel
+
+The NSQ module also has support to publish updates to presence module thru the nsq_pua_publish function.
+
+This module is heavily based on the Kazoo module from 2600hz.
+
+## 2. How it works
+
+The module works with a main forked process that does the communication with your nsq system for issuing publishes, waiting for replies, and consuming messages. When it consumes a message it defers the process to a worker thread so it doesn't block the main process (uses libev).
+
+### 2.1. Event Routes
+
+The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
+
+NSQ module will try to execute the event route from most significant to less significant. define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]
+
+#### Example
+```
+...
+modparam("nsq", "consumer_event_key", "Event-Type")
+modparam("nsq", "consumer_event_subkey", "Event-Name")
+...
+
+event_route[nsq:consumer-event-presence-update]
+{
+# presence is the value extracted from Event-Type field in json payload
+# update is the value extracted from Event-Name field in json payload
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
+...
+}
+
+event_route[nsq:consumer-event-presence]
+{
+# presence is the value extracted from Event-Type field in json payload
+xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
+...
+}
+
+event_route[nsq:consumer-event]
+{
+# this event route is executed if we can't find the previous
+}
+
+```
+
+### 2.2 Acknowledge Messages
+
+Consumed messages have the option of being acknowledged in two ways:
+
+* immediately when received
+* after processing by the worker
+
+
+## 3. Dependencies
+
+### 3.1. Kamailio Modules
+
+The following modules must be loaded before this module:
+
+* none
+
+### 3.2. External Libraries or Applications
+
+* libev
+* libjson
+* libuuid
+
+## 4. Parameters
+
+### 4.1. NSQ Client
+
+#### 4.1.1. nsqd_address(str)
+
+The http address of the nsqd to post messages to
+
+_Default value is Null. You must set this parameter value for the module to work_
+
+__Example__
+```
+...
+modparam("nsq", "nsqd_address", "127.0.0.1:4151")
+...
+```
+
+#### 4.1.2. lookupd_address(str)
+
+The http address of the nsq lookupd servers ( _comma seperated_ )
+
+_Default value is Null. You must set this parameter value for the module to work_
+
+__Example__
+```
+...
+modparam("nsq", "lookupd_address", "10.10.10.1:4161,10.10.10.2:4161")
+...
+```
+
+#### 4.1.3. consumer_topic(str)
+
+The topic to listen on for inbound events
+
+__Example__
+```
+...
+modparam("nsq", "consumer_topic", "kamailio")
+...
+```
+
+#### 4.1.4. consumer_channel(str)
+
+The channel to listen on for inbound events
+
+__Example__
+```
+...
+modparam("nsq", "consumer_channel", "sip-proxy-01")
+...
+```
+
+#### 4.1.5. consumer_event_key(str)
+
+The JSON property name to watch for for handling event_routes
+
+_Default value is "Event-Type"_
+
+__Example__
+```
+...
+modparam("nsq", "consumer_event_key", "Type")
+...
+```
+
+
+#### 4.1.6. consumer_event_subkey(str)
+
+The JSON property sub key name to watch for for handling event_routes
+
+_Default value is "Event-Name"_
+
+__Example__
+```
+...
+modparam("nsq", "consumer_event_subkey", "Name")
+...
+```
+
+
+#### 4.1.7. max_in_flight(int)
+
+Number of messages the nsq client will handle concurrently
+
+_Default value is 100_
+
+__Example__
+```
+...
+modparam("nsq", "max_in_flight", 5)
+...
+```
+
+#### 4.1.7. query_timeout(int)
+
+Number of seconds until timeout for query requests
+
+_Default value is 2_
+
+__Example__
+```
+...
+modparam("nsq", "query_timeout", 5)
+...
+```
+
+### 4.2. Presence Related
+
+#### 4.2.1. db_url(str)
+
+The database for the presentity table.
+
+If set, the nsq_ppua_publish function will update the presentity status in the database.
+
+_Default value is “NULL”._
+
+__Example__
+```
+...
+modparam("nsq", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
+...
+```
+
+#### 4.2.2. presentity_table(str)
+
+The name of the presentity table in the database.
+
+_Default value is “presentity”._
+
+__Example__
+```
+...
+modparam("nsq", "presentity_table", "my_presentity_table")
+...
+```
+
+
+## 5. Functions
+
+### 5.1. nsq related
+
+#### 5.1.1. nsq_publish(topic, json_payload)
+
+The function publishes a json payload to the nsq topic passed in.
+
+This function can be used from ANY ROUTE.
+
+__Example__
+```
+...
+$var(nsq_payload_request) = "{'Event-Type' : 'directory', 'Event-Name' : 'reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "', 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU + "', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }";
+nsq_publish("registrations", $var(nsq_payload_request));
+...
+```
+
+#### 5.1.2. nsq_query(topic, json_payload [, target_var])
+
+The function publishes a json payload to nsq, waits for a correlated message and puts the result in target_var. target_var is optional as the function also puts the result in pseudo-variable $nqR.
+
+This function can be used from ANY ROUTE.
+
+__Example__
+```
+...
+$var(nsq_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
+nsq_encode("$ci", "$var(callid_encoded)");
+if(nsq_query("callevt", $var(nsq_payload_request), "$var(nsq_result)")) {
+   nsq_json("$var(nsq_result)", "Channels[0].switch_url", "$du");
+   if($du != $null) {
+       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+       return;
+   }
+}
+...
+```
+
+### 5.2. presence related
+
+#### 5.2.1. nsq_pua_publish(json_payload)
+
+The function build presentity state from json_payload and updates presentity table.
+
+This function can be used from ANY ROUTE.
+
+__Example__
+```
+...
+event_route[nsq:consumer-event-presence-update]
+{
+    xlog("L_INFO", "received $(nqE{nq.json,Event-Package}) update for $(nqE{nq.json,From})");
+    nsq_pua_publish($kzE);
+    pres_refresh_watchers("$(nqE{nq.json,From})", "$(nqE{nq.json,Event-Package})", 1);
+}
+...
+```
+
+### 5.3. other
+
+### 5.3. presence related
+
+#### 5.3.1. nsq_encode(to_encode, target_var)
+
+The function encodes the 1st parameter to JSON and puts the result in the 2nd parameter.
+
+This function can be used from ANY ROUTE.
+
+__Example__
+```
+...
+event_route[nsq:consumer-event-presence-update]
+{
+    xlog("L_INFO", "received $(nqE{nq.json,Event-Package}) update for $(nqE{nq.json,From})");
+    nsq_pua_publish($nqE);
+    pres_refresh_watchers("$(nqE{nq.json,From})", "$(nqE{nq.json,Event-Package})", 1);
+}
+...
+```
+
+
+
+## 6. Exported pseudo-variables
+
+## 7. Transformations
+
+
+
+
+
diff --git a/modules/nsq/defs.h b/modules/nsq/defs.h
new file mode 100644 (file)
index 0000000..7e75756
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * defs.h
+ *
+ */
+
+#ifndef _NSQ_DEFS_H_
+#define _NSQ_DEFS_H_
+
+#define BLF_MAX_DIALOGS 8
+#define BLF_JSON_PRES          "Presentity"
+#define BLF_JSON_PRES_USER     "Presentity-User"
+#define BLF_JSON_PRES_REALM    "Presentity-Realm"
+#define BLF_JSON_FROM          "From"
+#define BLF_JSON_FROM_USER     "From-User"
+#define BLF_JSON_FROM_REALM    "From-Realm"
+#define BLF_JSON_FROM_URI      "From-URI"
+#define BLF_JSON_TO            "To"
+#define BLF_JSON_TO_USER       "To-User"
+#define BLF_JSON_TO_REALM      "To-Realm"
+#define BLF_JSON_TO_URI                "To-URI"
+#define BLF_JSON_CALLID        "Call-ID"
+#define BLF_JSON_TOTAG         "To-Tag"
+#define BLF_JSON_FROMTAG       "From-Tag"
+#define BLF_JSON_STATE         "State"
+#define BLF_JSON_USER          "User"
+#define BLF_JSON_QUEUE         "Queue"
+#define BLF_JSON_EXPIRES       "Expires"
+#define BLF_JSON_APP_NAME       "App-Name"
+#define BLF_JSON_APP_VERSION    "App-Version"
+#define BLF_JSON_NODE           "Node"
+#define BLF_JSON_SERVERID       "Server-ID"
+#define BLF_JSON_EVENT_CATEGORY "Event-Category"
+#define BLF_JSON_EVENT_NAME     "Event-Name"
+#define BLF_JSON_TYPE           "Type"
+#define BLF_JSON_MSG_ID         "Msg-ID"
+#define BLF_JSON_DIRECTION      "Direction"
+
+#define BLF_JSON_CONTACT       "Contact"
+#define BLF_JSON_EVENT_PKG      "Event-Package"
+#define MWI_JSON_WAITING        "Messages-Waiting"
+#define MWI_JSON_NEW            "Messages-New"
+#define MWI_JSON_SAVED          "Messages-Saved"
+#define MWI_JSON_URGENT         "Messages-Urgent"
+#define MWI_JSON_URGENT_SAVED   "Messages-Urgent-Saved"
+#define MWI_JSON_ACCOUNT        "Message-Account"
+#define MWI_JSON_FROM          "From"
+#define MWI_JSON_TO            "To"
+
+#define DIALOGINFO_BODY_BUFFER_SIZE 8192
+#define MWI_BODY_BUFFER_SIZE 2048
+#define PRESENCE_BODY_BUFFER_SIZE 4096
+
+#define MWI_BODY             "Messages-Waiting: %.*s\r\nMessage-Account: %.*s\r\nVoice-Message: %.*s/%.*s (%.*s/%.*s)\r\n"
+#define PRESENCE_BODY        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<presence xmlns=\"urn:ietf:params:xml:ns:pidf\" xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" xmlns:c=\"urn:ietf:params:xml:ns:pidf:cipid\" entity=\"%s\"> \
+<tuple xmlns=\"urn:ietf:params:xml:ns:pidf\" id=\"%s\">\
+<status>\
+<basic>%s</basic>\
+</status>\
+</tuple>\
+<note xmlns=\"urn:ietf:params:xml:ns:pidf\">%s</note>\
+<dm:person xmlns:dm=\"urn:ietf:params:xml:ns:pidf:data-model\" xmlns:rpid=\"urn:ietf:params:xml:ns:pidf:rpid\" id=\"1\">\
+<rpid:activities>%s</rpid:activities>\
+<dm:note>%s</dm:note>\
+</dm:person>\
+</presence>"
+
+#define DIALOGINFO_EMPTY_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\"> \
+<dialog call-id=\"76001e23e09704ea9e1257ebea85e1f3\" direction=\"initiator\">\
+<state>terminated</state>\
+</dialog>\
+</dialog-info>"
+
+#define LOCAL_TAG "local-tag=\"%.*s\""
+#define REMOTE_TAG "remote-tag=\"%.*s\""
+
+#define DIALOGINFO_BODY "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\">\
+<dialog id=\"%.*s\" call-id=\"%.*s\" %.*s %.*s direction=\"%.*s\">\
+<state>%.*s</state>\
+<local>\
+<identity display=\"%.*s\">%.*s</identity>\
+<target uri=\"%.*s\"/>\
+</local>\
+<remote>\
+<identity display=\"%.*s\">%.*s</identity>\
+<target uri=\"%.*s\"/>\
+</remote>\
+</dialog>\
+</dialog-info>"
+
+#define DIALOGINFO_BODY_2 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<dialog-info xmlns=\"urn:ietf:params:xml:ns:dialog-info\" version=\"1\" state=\"full\" entity=\"%.*s\">\
+<dialog id=\"%.*s\" call-id=\"%.*s\" %.*s %.*s direction=\"%.*s\">\
+<state>%.*s</state>\
+<local>\
+<identity display=\"%.*s\">%.*s</identity>\
+</local>\
+<remote>\
+<identity display=\"%.*s\">%.*s</identity>\
+</remote>\
+</dialog>\
+</dialog-info>"
+
+#endif /* _NSQ_DEFS_H_ */
diff --git a/modules/nsq/doc/Makefile b/modules/nsq/doc/Makefile
new file mode 100644 (file)
index 0000000..6b63117
--- /dev/null
@@ -0,0 +1,4 @@
+docs = nsq.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module
diff --git a/modules/nsq/doc/nsq.xml b/modules/nsq/doc/nsq.xml
new file mode 100644 (file)
index 0000000..12635fe
--- /dev/null
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+       <title>NSQ Module</title>
+       <productname class="trade">&kamailioname;</productname>
+       <authorgroup>
+           <author>
+               <firstname>Weave Communications</firstname>
+               <email>comm@getweave.com</email>
+           </author>
+           <editor>
+               <firstname>Emmanuel</firstname>
+               <surname>Schmidbauer</surname>
+               <email>emmanuel@getweave.com</email>
+           </editor>
+       </authorgroup>
+       <copyright>
+           <year>2016</year>
+           <holder>Weave Communications</holder>
+       </copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="nsq_admin.xml"/>
+    <!-- <xi:include href="db_text_devel.xml"/> -->
+    
+
+</book>
diff --git a/modules/nsq/doc/nsq_admin.xml b/modules/nsq/doc/nsq_admin.xml
new file mode 100644 (file)
index 0000000..980c79a
--- /dev/null
@@ -0,0 +1,492 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter xmlns:xi="http://www.w3.org/2001/XInclude">
+       <title>&adminguide;</title>
+
+
+    <section>
+    <title>Overview</title>
+    <para> The NSQ module an NSQ consumer. It exposes only consume capabilities into Kamailio.
+    </para>
+<para>
+From a high-level, the purpose of the module might be for things like:
+<itemizedlist>
+<listitem>
+<para>
+Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
+</para>
+</listitem>
+<listitem>
+<para>
+Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
+</para>
+</listitem>
+</itemizedlist>
+</para>
+
+
+<para>
+supported operations are:
+<itemizedlist>
+<listitem>
+<para>
+subscribe to a Topic and Channel
+</para>
+</listitem>
+</itemizedlist>
+</para>
+<para>
+The NSQ module also has support to publish updates to presence module through the nsq_pua_publish function
+</para>
+
+</section>
+    <section>
+    <title>How it works</title>
+<para>
+The module works with a main forked process that does the communication with NSQ for consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
+</para>
+    <section>
+    <title>event routes</title>
+    <para>
+The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
+    </para>
+    <para>
+    NSQ module will try to execute the event route from most significant to less significant.
+    define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]
+    </para>
+    <para>
+    we can set the key/subkey pair on a subscription base. check the payload on subscribe.
+    </para>
+        <example>
+        <title>define the event route</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "consumer_event_key", "Event-Category")
+modparam("nsq", "consumer_event_sub_key", "Event-Name")
+...
+
+event_route[nsq:consumer-event-presence-update]
+{
+# presence is the value extracted from Event-Category field in json payload
+# update is the value extracted from Event-Name field in json payload
+xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{kznsqjson,From})");
+...
+}
+
+event_route[nsq:consumer-event-presence]
+{
+# presence is the value extracted from Event-Category field in json payload
+xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+...
+}
+
+event_route[nsq:consumer-event-event-category-event-name]
+{
+# event-category is the name of the consumer_event_key parameter
+# event-name is the name of the consumer_event_sub_key parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[nsq:consumer-event-event-category]
+{
+# event-category is the name of the consumer_event_key parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[nsq:consumer-event]
+{
+# this event route is executed if we can't find the previous
+}
+
+</programlisting>
+        </example>
+</section>
+    <section>
+    <title>aknowledge messages</title>
+<para>
+Consumed messages have the option of being acknowledge in two ways:
+<itemizedlist>
+<listitem>
+<para>
+immediately when received
+</para>
+</listitem>
+<listitem>
+<para>
+after processing by the worker
+</para>
+</listitem>
+</itemizedlist>
+
+    </para>
+
+</section>
+    </section>
+
+    <section>
+    <title>Dependencies</title>
+    <section>
+        <title>&kamailio; Modules</title>
+        <para>
+        The following modules must be loaded before this module:
+            <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>none</emphasis>.
+            </para>
+            </listitem>
+            </itemizedlist>
+        </para>
+    </section>
+    <section>
+        <title>External Libraries or Applications</title>
+        <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>libev</emphasis>.
+            </para>
+            </listitem>
+            <listitem>
+            <para>
+                <emphasis>libjson</emphasis>.
+            </para>
+            </listitem>
+            <listitem>
+            <para>
+                <emphasis>libevbuffsock</emphasis>.
+            </para>
+            </listitem>
+                       <listitem>
+            <para>
+                <emphasis>libnsq</emphasis>.
+            </para>
+            </listitem>
+        </itemizedlist>
+
+        </section>
+    </section>
+
+
+    <section>
+    <title>Parameters</title>
+    <section>
+    <title>NSQ related</title>
+    <section>
+        <title><varname>lookupd_address</varname>(str)</title>
+        <para>
+        The nsqlookupd address.
+        </para>
+        <para>
+        <emphasis>Default value is 127.0.0.1</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>lookupd_address</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "lookupd_address", "nsqlookupd.mydomain.com")
+...
+</programlisting>
+        </example>
+    </section>
+    <section>
+        <title><varname>lookupd_port</varname>(int)</title>
+        <para>
+        The nsqlookupd TCP port.
+        </para>
+        <para>
+        <emphasis>Default value is 4161.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>lookupd_port</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "lookupd_port", 4161)
+...
+</programlisting>
+        </example>
+    </section>
+
+       <section>
+        <title><varname>nsqd_address</varname>(str)</title>
+        <para>
+        The nsqd address. You can specify connecting directly to nsqd instead of using nsqlookupd.
+        </para>
+        <para>
+        <emphasis>Default value is 127.0.0.1</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>nsqd_address</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "nsqd_address", "nsqd.mydomain.com")
+...
+</programlisting>
+        </example>
+    </section>
+    <section>
+        <title><varname>nsqd_port</varname>(int)</title>
+        <para>
+        The nsqd TCP port.
+        </para>
+        <para>
+        <emphasis>Default value is 4150.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>nsqd_port</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "nsqd_port", 4150)
+...
+</programlisting>
+        </example>
+    </section>
+
+       <section>
+        <title><varname>consumer_use_nsqd</varname>(int)</title>
+        <para>
+        Set to 1 if you'd like to connect to nsqd instead of nsqlookupd.
+        </para>
+        <para>
+        <emphasis>Default value is 0.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>consumer_use_nsqd</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "consumer_use_nsqd", 1)
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>consumer_event_key</varname>(str)</title>
+        <para>
+        The default name of the field in json payload to compose the event name 1st part
+        </para>
+        <para>
+        <emphasis>Default value is <quote>Event-Category</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>consumer_event_key</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "consumer_event_key", "My-JSON-Field-Name")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>consumer_event_sub_key</varname>(str)</title>
+        <para>
+        The default name of the field in json payload to compose the event name 2nd part
+        </para>
+        <para>
+        <emphasis>Default value is <quote>Event-Name</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>consumer_event_sub_key</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "consumer_event_sub_key", "My-JSON-SubField-Name")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>max_in_flight</varname>(int)</title>
+        <para>
+        The number of messages the consumer can receive before nsqd expects a response.
+        </para>
+        <para>
+        <emphasis>Default value is 1.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>max_in_flight</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "max_in_flight", 2)
+...
+</programlisting>
+        </example>
+    </section>
+
+       <section>
+        <title><varname>consumer_workers</varname>(int)</title>
+        <para>
+        Number of consumer connections to NSQ per topic_channel.
+        </para>
+        <para>
+        <emphasis>Default value is 4.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>consumer_workers</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "consumer_workers", 2)
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>topic_channel</varname>(str)</title>
+        <para>
+        The NSQ Topic and Channel. Delimiter-separated by <quote>:</quote>. It be set multiple times to subscribe to multiple topics and channels. The value of consumer_workers is allocated per topic_channel.
+        </para>
+        <para>
+        <emphasis>Default value is <quote>Kamailio-Topic:Kamailio-Channel</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>topic_channel</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "topic_channel", "My-NSQ-Topic:My-NSQ-Channel")
+modparam("nsq", "topic_channel", "My-NSQ-Topic-2:My-NSQ-Channel-2")
+...
+</programlisting>
+        </example>
+    </section>
+
+    </section>
+
+
+
+    <section>
+    <title>presence related</title>
+    <section>
+        <title><varname>db_url</varname>(str)</title>
+        <para>
+        The database for the presentity table.
+        </para>
+        <para>If set, the nsq_pua_publish function will update the presentity status in the database.
+        </para>
+        <para>
+        <emphasis>Default value is <quote>NULL</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>db_url</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "db_url", "&defaultdb;")
+...
+</programlisting>
+        </example>
+    </section>
+
+    <section>
+        <title><varname>presentity_table</varname>(str)</title>
+        <para>
+        The name of the presentity table in the database.
+        </para>
+        <para>
+        <emphasis>Default value is <quote>presentity</quote>.</emphasis>
+        </para>
+        <example>
+        <title>Set <varname>presentity_table</varname> parameter</title>
+        <programlisting format="linespecific">
+...
+modparam("nsq", "presentity_table", "my_presentity_table")
+...
+</programlisting>
+        </example>
+    </section>
+
+
+    </section>
+
+
+
+
+
+</section>
+<section>
+    <title>Functions</title>
+
+    <section>
+    <title>presence related</title>
+    <section>
+        <title>
+        <function moreinfo="none">nsq_pua_publish(json_payload)</function>
+        </title>
+        <para>
+        The function build presentity state from json_payload and updates presentity table.
+        </para>
+        <para>
+        This function can be used from ANY ROUTE.
+        </para>
+
+        <example>
+        <title><function>nsq_pua_publish</function> usage</title>
+        <programlisting format="linespecific">
+...
+event_route[nsq:consumer-event-presence-update]
+{
+    xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+    nsq_pua_publish($nsqE);
+    pres_refresh_watchers("$(nsqE{nsq.json,From})", "$(nsqE{nsq.json,Event-Package})", 1);
+}
+...
+</programlisting>
+        </example>
+    </section>
+
+
+</section>
+
+</section>
+
+        <section>
+        <title>Exported pseudo-variables</title>
+        <itemizedlist>
+            <listitem>
+            <para>
+                <emphasis>$nsqE</emphasis>
+                Contains the payload of a consumed message
+            </para>
+            </listitem>
+        </itemizedlist>
+    </section>
+
+        <section>
+        <title>Transformations</title>
+        <para>The prefix for nsq transformations is nsq.</para>
+        <itemizedlist>
+            <listitem><para>
+                <emphasis>json</emphasis>
+            </para>
+        <example>
+        <title><function>nsq.json</function> usage</title>
+        <programlisting format="linespecific">
+...
+#nsq_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+$du = $nsqE{nsq.json,Channels[0].switch_url};
+if($du != $null) {
+  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+  return;
+}
+...
+</programlisting>
+        </example>
+
+            </listitem>
+
+        </itemizedlist>
+    </section>
+
+
+</chapter>
+
diff --git a/modules/nsq/http.h b/modules/nsq/http.h
new file mode 100644 (file)
index 0000000..a8bc40c
--- /dev/null
@@ -0,0 +1,48 @@
+#ifndef __http_h
+#define __http_h
+
+#include <ev.h>
+#include <curl/curl.h>
+
+struct HttpClient {
+    CURLM *multi;
+    struct ev_loop *loop;
+    struct ev_timer timer_event;
+    int still_running;
+};
+
+struct HttpResponse {
+    int status_code;
+    struct Buffer *data;
+};
+
+struct HttpRequest {
+    CURL *easy;
+    char *url;
+    struct HttpClient *httpc;
+    char error[CURL_ERROR_SIZE];
+    struct Buffer *data;
+    void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg);
+    void *cb_arg;
+};
+
+struct HttpSocket {
+    curl_socket_t sockfd;
+    CURL *easy;
+    int action;
+    long timeout;
+    struct ev_io ev;
+    int evset;
+    struct HttpClient *httpc;
+};
+
+struct HttpClient *new_http_client(struct ev_loop *loop);
+void free_http_client(struct HttpClient *httpc);
+struct HttpRequest *new_http_request(const char *url,
+    void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg), void *cb_arg, char *data);
+void free_http_request(struct HttpRequest *req);
+struct HttpResponse *new_http_response(int status_code, void *data);
+void free_http_response(struct HttpResponse *resp);
+int http_client_get(struct HttpClient *httpc, struct HttpRequest *req);
+
+#endif
diff --git a/modules/nsq/nsq.h b/modules/nsq/nsq.h
new file mode 100644 (file)
index 0000000..8473d36
--- /dev/null
@@ -0,0 +1,91 @@
+#ifndef __nsq_h
+#define __nsq_h
+
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <time.h>
+#include <ev.h>
+#include <evbuffsock.h>
+
+#include "utlist.h"
+
+typedef enum {NSQ_FRAME_TYPE_RESPONSE, NSQ_FRAME_TYPE_ERROR, NSQ_FRAME_TYPE_MESSAGE} frame_type;
+struct NSQDConnection;
+struct NSQMessage;
+
+struct NSQReader {
+    char *topic;
+    char *channel;
+    void *ctx; //context for call back
+    int max_in_flight;
+    struct NSQDConnection *conns;
+    struct NSQLookupdEndpoint *lookupd;
+    struct ev_timer lookupd_poll_timer;
+    struct ev_loop *loop;
+    void *httpc;
+    void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn);
+    void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn);
+    void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx);
+};
+
+struct NSQReader *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx,
+    void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+    void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+    void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx));
+void free_nsq_reader(struct NSQReader *rdr);
+int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port);
+int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port);
+void nsq_reader_set_loop(struct NSQReader *rdr, struct ev_loop *loop);
+void nsq_run(struct ev_loop *loop);
+
+struct NSQDConnection {
+    struct BufferedSocket *bs;
+    struct Buffer *command_buf;
+    uint32_t current_msg_size;
+    uint32_t current_frame_type;
+    char *current_data;
+    struct ev_loop *loop;
+    void (*connect_callback)(struct NSQDConnection *conn, void *arg);
+    void (*close_callback)(struct NSQDConnection *conn, void *arg);
+    void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg);
+    void *arg;
+    struct NSQDConnection *next;
+};
+
+struct NSQDConnection *new_nsqd_connection(struct ev_loop *loop, const char *address, int port,
+    void (*connect_callback)(struct NSQDConnection *conn, void *arg),
+    void (*close_callback)(struct NSQDConnection *conn, void *arg),
+    void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg),
+    void *arg);
+void free_nsqd_connection(struct NSQDConnection *conn);
+int nsqd_connection_connect(struct NSQDConnection *conn);
+void nsqd_connection_disconnect(struct NSQDConnection *conn);
+
+void nsq_subscribe(struct Buffer *buf, const char *topic, const char *channel);
+void nsq_ready(struct Buffer *buf, int count);
+void nsq_finish(struct Buffer *buf, const char *id);
+void nsq_requeue(struct Buffer *buf, const char *id, int timeout_ms);
+void nsq_nop(struct Buffer *buf);
+
+struct NSQMessage {
+    int64_t timestamp;
+    uint16_t attempts;
+    char id[16+1];
+    size_t body_length;
+    char *body;
+};
+
+struct NSQMessage *nsq_decode_message(const char *data, size_t data_length);
+void free_nsq_message(struct NSQMessage *msg);
+
+struct NSQLookupdEndpoint {
+    char *address;
+    int port;
+    struct NSQLookupdEndpoint *next;
+};
+
+struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port);
+void free_nsqlookupd_endpoint(struct NSQLookupdEndpoint *nsqlookupd_endpoint);
+
+#endif
diff --git a/modules/nsq/nsq_json.c b/modules/nsq/nsq_json.c
new file mode 100644 (file)
index 0000000..216a868
--- /dev/null
@@ -0,0 +1,302 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include "nsq_json.h"
+
+# define json_foreach_key(obj,key) \
+       char *key;\
+       struct lh_entry *entry ## key; \
+       struct lh_entry *entry_next ## key = NULL; \
+       for(entry ## key = json_object_get_object(obj)->head; \
+               (entry ## key ? ( \
+                       key = (char*)entry ## key->k, \
+                       entry_next ## key = entry ## key->next, \
+                       entry ## key) : 0); \
+               entry ## key = entry_next ## key)
+
+
+
+static str nsq_pv_str_empty = {"", 0};
+
+char **str_split(char* a_str, const char a_delim)
+{
+       char** result    = 0;
+       size_t count     = 0;
+       char* tmp        = a_str;
+       char* last_comma = 0;
+       char delim[2];
+       delim[0] = a_delim;
+       delim[1] = 0;
+       int len = 0;
+
+       /* Count how many elements will be extracted. */
+       while (*tmp) {
+               if (a_delim == *tmp) {
+                       count++;
+                       last_comma = tmp;
+               }
+               tmp++;
+       }
+
+       /* Add space for trailing token. */
+       count += last_comma < (a_str + strlen(a_str) - 1);
+
+       /* Add space for terminating null string so caller
+          knows where the list of returned strings ends. */
+       count++;
+
+       result = pkg_malloc(sizeof(char*) * count);
+
+       if (result) {
+               size_t idx  = 0;
+               char* token = strtok(a_str, delim);
+
+               while (token) {
+                       assert(idx < count);
+                       len = strlen(token);
+                       char* ptr = pkg_malloc( (len+1) * sizeof(char));
+                       *(result + idx) = ptr;
+                       memcpy(ptr, token, len);
+                       ptr[len] = '\0';
+                       int i = 0;
+                       while(i < len) {
+                               if (ptr[i] == nsq_json_escape_char)
+                                       ptr[i] = '.';
+                               i++;
+                       }
+                       token = strtok(0, delim);
+                       idx++;
+               }
+               assert(idx == count - 1);
+               *(result + idx) = 0;
+       }
+
+       return result;
+}
+
+struct json_object * nsq_json_get_field_object(str* json, str* field)
+{
+       char** tokens;
+       char* dup;
+       char f1[25], f2[25];//, f3[25];
+       int i;
+
+       dup = pkg_malloc(json->len+1);
+       memcpy(dup, json->s, json->len);
+       dup[json->len] = '\0';
+       struct json_object *j = json_tokener_parse(dup);
+       pkg_free(dup);
+
+       if (is_error(j)) {
+               LM_ERR("empty or invalid JSON\n");
+               return NULL;
+       }
+
+       struct json_object *jtree = NULL;
+       struct json_object *ret = NULL;
+
+       LM_DBG("getting json %.*s\n", field->len, field->s);
+
+       dup = pkg_malloc(field->len+1);
+       memcpy(dup, field->s, field->len);
+       dup[field->len] = '\0';
+       tokens = str_split(dup, '.');
+       pkg_free(dup);
+
+       if (tokens) {
+               jtree = j;
+               for (i = 0; *(tokens + i); i++) {
+                       if (jtree != NULL) {
+                               str field = str_init(*(tokens + i));
+                               // check for idx []
+                               int sresult = sscanf(field.s, "%[^[][%[^]]]", f1, f2); //, f3);
+                               LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, field.s, f1, (sresult > 1? f2 : "(null)"));
+
+                               jtree = nsq_json_get_object(jtree, f1);
+                               if (jtree != NULL) {
+                                       char *value = (char*)json_object_get_string(jtree);
+                                       LM_DBG("JTREE OK %s\n", value);
+                               }
+                               if (jtree != NULL && sresult > 1 && json_object_is_type(jtree, json_type_array)) {
+                                       int idx = atoi(f2);
+                                       jtree = json_object_array_get_idx(jtree, idx);
+                                       if (jtree != NULL) {
+                                               char *value = (char*)json_object_get_string(jtree);
+                                               LM_DBG("JTREE IDX OK %s\n", value);
+                                       }
+                               }
+                       }
+                       pkg_free(*(tokens + i));
+               }
+               pkg_free(tokens);
+       }
+
+
+       if (jtree != NULL)
+               ret = json_object_get(jtree);
+
+       json_object_put(j);
+
+       return ret;
+}
+
+
+int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val)
+{
+       struct json_object *jtree = nsq_json_get_field_object(json, field);
+
+
+       if (jtree != NULL) {
+               char *value = (char*)json_object_get_string(jtree);
+               int len = strlen(value);
+               dst_val->rs.s = pkg_malloc(len+1);
+               memcpy(dst_val->rs.s, value, len);
+               dst_val->rs.s[len] = '\0';
+               dst_val->rs.len = len;
+               dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+               dst_val->ri = 0;
+               json_object_put(jtree);
+       } else {
+               dst_val->flags = PV_VAL_NULL;
+               dst_val->rs = nsq_pv_str_empty;
+               dst_val->ri = 0;
+       }
+       return 1;
+}
+
+
+int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst)
+{
+       str json_s;
+       str field_s;
+       pv_spec_t *dst_pv;
+       pv_value_t dst_val;
+
+       if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
+               LM_ERR("cannot get json string value\n");
+               return -1;
+       }
+
+       if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
+               LM_ERR("cannot get field string value\n");
+               return -1;
+       }
+
+       if (nsq_json_get_field_ex(&json_s, &field_s, &dst_val) != 1)
+               return -1;
+
+       dst_pv = (pv_spec_t *)dst;
+       dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
+       if (dst_val.flags & PV_VAL_PKG) {
+               pkg_free(dst_val.rs.s);
+       } else if (dst_val.flags & PV_VAL_SHM) {
+               shm_free(dst_val.rs.s);
+       }
+
+       return 1;
+}
+
+struct json_object* nsq_json_parse(const char *str)
+{
+       struct json_tokener* tok;
+       struct json_object* obj;
+
+       tok = json_tokener_new();
+       if (!tok) {
+               LM_ERR("Error parsing json: could not allocate tokener\n");
+               return NULL;
+       }
+
+       obj = json_tokener_parse_ex(tok, str, -1);
+       if (tok->err != json_tokener_success) {
+               LM_ERR("Error parsing json: %s\n", json_tokener_error_desc(tok->err));
+               LM_ERR("%s\n", str);
+               if (obj != NULL) {
+                       json_object_put(obj);
+               }
+               obj = NULL;
+       }
+
+       json_tokener_free(tok);
+       return obj;
+}
+
+struct json_object* nsq_json_get_object(struct json_object* jso, const char *key)
+{
+       struct json_object *result = NULL;
+       json_object_object_get_ex(jso, key, &result);
+       return result;
+}
+
+int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst)
+{
+       str json_s;
+       str field_s;
+       int_str keys_avp_name;
+       unsigned short keys_avp_type;
+       pv_spec_t *avp_spec;
+
+       if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
+               LM_ERR("cannot get json string value\n");
+               return -1;
+       }
+
+       if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
+               LM_ERR("cannot get field string value\n");
+               return -1;
+       }
+
+       if (dst == NULL){
+               LM_ERR("avp spec is null\n");
+               return -1;
+       }
+
+       avp_spec = (pv_spec_t *)dst;
+
+       if (avp_spec->type != PVT_AVP) {
+               LM_ERR("invalid avp spec\n");
+               return -1;
+       }
+
+       if (pv_get_avp_name(0, &avp_spec->pvp, &keys_avp_name, &keys_avp_type)!=0) {
+               LM_ERR("invalid AVP definition\n");
+               return -1;
+       }
+
+       struct json_object *jtree = nsq_json_get_field_object(&json_s, &field_s);
+
+       if (jtree != NULL) {
+               json_foreach_key(jtree, k) {
+                       LM_DBG("ITERATING KEY %s\n", k);
+                       int_str v1;
+                       v1.s.s = k;
+                       v1.s.len = strlen(k);
+                       if (add_avp(AVP_VAL_STR|keys_avp_type, keys_avp_name, v1) < 0) {
+                               LM_ERR("failed to create AVP\n");
+                           json_object_put(jtree);
+                               return -1;
+                       }
+               }
+           json_object_put(jtree);
+       }
+
+       return 1;
+}
+
diff --git a/modules/nsq/nsq_json.h b/modules/nsq/nsq_json.h
new file mode 100644 (file)
index 0000000..db1d56e
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef __NSQ_JSON_H_
+#define __NSQ_JSON_H_
+
+#include "../../mod_fix.h"
+#include "../../lvalue.h"
+#include "../../parser/msg_parser.h"
+#include <json.h>
+
+#define json_extract_field(json_name, field)  do {                      \
+       struct json_object* obj =  nsq_json_get_object(json_obj, json_name); \
+       field.s = (char*)json_object_get_string(obj);                       \
+       if (field.s == NULL) {                                              \
+         LM_DBG("Json-c error - failed to extract field [%s]\n", json_name); \
+         field.s = "";                                                     \
+       } else {                                                            \
+         field.len = strlen(field.s);                                      \
+       }                                                                   \
+       LM_DBG("%s: [%s]\n", json_name, field.s?field.s:"Empty");           \
+  } while (0);
+
+
+extern char nsq_json_escape_char;
+extern str nsq_event_key;
+extern str nsq_event_sub_key;
+
+int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst);
+int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val);
+int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst);
+
+struct json_object* nsq_json_parse(const char *str);
+struct json_object* nsq_json_get_object(struct json_object* jso, const char *key);
+
+#endif /* __NSQ_JSON_H_ */
diff --git a/modules/nsq/nsq_mod.c b/modules/nsq/nsq_mod.c
new file mode 100644 (file)
index 0000000..3db563e
--- /dev/null
@@ -0,0 +1,361 @@
+/*
+ * NSQ module interface
+ *
+ * Copyright (C) 2016 Weave Communications
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * History:
+ * --------
+ * 2016-03  first version (Weave Communications)
+ */
+
+#include "nsq_mod.h"
+
+MODULE_VERSION
+
+static tr_export_t mod_trans[] = {
+       { {"nsq", sizeof("nsq")-1}, nsq_tr_parse},
+       { { 0, 0 }, 0 }
+};
+
+static pv_export_t nsq_mod_pvs[] = {
+       {{"nsqE", (sizeof("nsqE")-1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, 0, 0, 0, 0},
+       { {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
+};
+
+static cmd_export_t cmds[] = {
+       {"nsq_pua_publish", (cmd_function) nsq_pua_publish, 1, 0, 0, ANY_ROUTE},
+       {0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[]=
+{
+       {"consumer_workers", INT_PARAM, &dbn_consumer_workers},
+       {"max_in_flight", INT_PARAM, &nsq_max_in_flight},
+       {"lookupd_address", PARAM_STR, &nsq_lookupd_address},
+       {"lookupd_port", INT_PARAM, &lookupd_port},
+       {"consumer_use_nsqd", INT_PARAM, &consumer_use_nsqd}, // consume messages from nsqd instead of lookupd
+       {"topic_channel", PARAM_STRING|USE_FUNC_PARAM, (void*)nsq_add_topic_channel},
+       {"nsqd_address", PARAM_STR, &nsqd_address},
+       {"nsqd_port", INT_PARAM, &nsqd_port},
+       {"consumer_event_key", PARAM_STR, &nsq_event_key},
+       {"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key},
+       {"pua_include_entity", INT_PARAM, &dbn_include_entity},
+       {"presentity_table", PARAM_STR, &nsq_presentity_table},
+       {"db_url", PARAM_STR, &nsq_db_url},
+       {"pua_mode", INT_PARAM, &dbn_pua_mode},
+       {"json_escape_char", PARAM_STR, &nsq_json_escape_str},
+       { 0, 0, 0 }
+};
+
+static void free_tc_list(nsq_topic_channel_t *tcl)
+{
+       nsq_topic_channel_t *tc, *prev_tc;
+       tc = tcl;
+       while (tc) {
+               prev_tc = tc;
+               tc = tc->next;
+               free(tc->topic);
+               free(tc->channel);
+               pkg_free(prev_tc);
+       }
+       tcl = NULL;
+}
+
+static int nsq_add_topic_channel(modparam_t type, void *val)
+{
+       nsq_topic_channel_t* tc;
+       size_t size;
+       char *channel = (char*)val;
+       char *topic;
+       char *sep = NULL;
+
+       sep = strchr(channel, ':');
+       if (!sep) {
+               topic = (char*)val;
+               channel = DEFAULT_CHANNEL;
+               LM_ERR("delimiter (\":\") not found inside topic_channel param, using default channel [%s]\n", channel);
+       } else {
+               topic = strsep(&channel, ":");
+       }
+       size = sizeof(nsq_topic_channel_t);
+       tc = (nsq_topic_channel_t*)pkg_malloc(size);
+       if (tc == NULL) {
+               LM_ERR("memory error!\n");
+               free_tc_list(tc_list);
+               return -1;
+       }
+       memset(tc, 0, size);
+       tc->topic = strdup(topic);
+       tc->channel = strdup(channel);
+       ++nsq_topic_channel_counter;
+
+       tc->next = tc_list;
+       tc_list = tc;
+
+       return 0;
+}
+
+struct module_exports exports = {
+       "nsq",
+       DEFAULT_DLFLAGS,                /* dlopen flags */
+       cmds,                                   /* Exported functions */
+       params,                                 /* Exported parameters */
+       0,                                              /* exported statistics */
+       0,                      /* exported MI functions */
+       nsq_mod_pvs,            /* exported pseudo-variables */
+       0,                                              /* extra processes */
+       mod_init,                               /* module initialization function */
+       0,                                              /* response function*/
+       mod_destroy,                    /* destroy function */
+       mod_child_init                  /* per-child init function */
+};
+
+static int fire_init_event(int rank)
+{
+       struct sip_msg *fmsg;
+       struct run_act_ctx ctx;
+       int rtb, rt;
+
+       LM_DBG("rank is (%d)\n", rank);
+       if (rank!=PROC_INIT)
+               return 0;
+
+       rt = route_get(&event_rt, "nsq:mod-init");
+       if (rt>=0 && event_rt.rlist[rt]!=NULL) {
+               LM_DBG("executing event_route[nsq:mod-init] (%d)\n", rt);
+               if (faked_msg_init()<0)
+                       return -1;
+               fmsg = faked_msg_next();
+               rtb = get_route_type();
+               set_route_type(REQUEST_ROUTE);
+               init_run_actions_ctx(&ctx);
+               run_top_route(event_rt.rlist[rt], fmsg, &ctx);
+               if (ctx.run_flags&DROP_R_F) {
+                       LM_ERR("exit due to 'drop' in event route\n");
+                       return -1;
+               }
+               set_route_type(rtb);
+       }
+
+       return 0;
+}
+
+static int mod_init(void)
+{
+       int i;
+       startup_time = (int) time(NULL);
+
+       if (dbn_pua_mode == 1) {
+               nsq_db_url.len = nsq_db_url.s ? strlen(nsq_db_url.s) : 0;
+               LM_DBG("db_url=%s/%d/%p\n", ZSW(nsq_db_url.s), nsq_db_url.len,nsq_db_url.s);
+               nsq_presentity_table.len = strlen(nsq_presentity_table.s);
+
+               if (nsq_db_url.len > 0) {
+
+                       /* binding to database module  */
+                       if (db_bind_mod(&nsq_db_url, &nsq_pa_dbf)) {
+                               LM_ERR("Database module not found\n");
+                               return -1;
+                       }
+
+                       if (!DB_CAPABILITY(nsq_pa_dbf, DB_CAP_ALL)) {
+                               LM_ERR("Database module does not implement all functions"
+                                               " needed by NSQ module\n");
+                               return -1;
+                       }
+
+                       nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
+                       if (!nsq_pa_db) {
+                               LM_ERR("Connection to database failed\n");
+                               return -1;
+                       }
+
+                       nsq_pa_dbf.close(nsq_pa_db);
+                       nsq_pa_db = NULL;
+               }
+       }
+
+       LM_DBG("NSQ Workers per Topic/Channel: %d\n", dbn_consumer_workers);
+       if (!nsq_topic_channel_counter) {
+               nsq_topic_channel_counter = 1;
+       }
+       LM_DBG("NSQ Total Topic/Channel: %d\n", nsq_topic_channel_counter);
+       dbn_consumer_workers = dbn_consumer_workers * nsq_topic_channel_counter;
+       LM_DBG("NSQ Total Workers: %d\n", dbn_consumer_workers);
+       int total_workers = dbn_consumer_workers + 2;
+
+       register_procs(total_workers);
+       cfg_register_child(total_workers);
+
+       if (pipe(nsq_cmd_pipe_fds) < 0) {
+               LM_ERR("cmd pipe() failed\n");
+               return -1;
+       }
+
+       nsq_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbn_consumer_workers) * 2 );
+       nsq_worker_pipes = (int*) shm_malloc(sizeof(int) * dbn_consumer_workers);
+       for (i=0; i < dbn_consumer_workers; i++) {
+               nsq_worker_pipes_fds[i*2] = nsq_worker_pipes_fds[i*2+1] = -1;
+               if (pipe(&nsq_worker_pipes_fds[i*2]) < 0) {
+                       LM_ERR("worker pipe(%d) failed\n", i);
+                       return -1;
+               }
+       }
+
+       nsq_cmd_pipe = nsq_cmd_pipe_fds[1];
+       for (i=0; i < dbn_consumer_workers; i++) {
+               nsq_worker_pipes[i] = nsq_worker_pipes_fds[i*2+1];
+       }
+
+       return 0;
+}
+
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
+{
+       if (nsq_tr_init_buffers() < 0) {
+               LM_ERR("failed to initialize transformations buffers\n");
+               return -1;
+       }
+       return register_trans_mod(path, mod_trans);
+}
+
+
+int set_non_blocking(int fd)
+{
+       int flags;
+
+       flags = fcntl(fd, F_GETFL);
+       if (flags < 0)
+               return flags;
+       flags |= O_NONBLOCK;
+       if (fcntl(fd, F_SETFL, flags) < 0)
+               return -1;
+
+       return 0;
+}
+
+/**
+ *
+ */
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+{
+       struct ev_loop *loop;
+       loop = ev_default_loop(0);
+       struct NSQReader *rdr;
+       void *ctx = NULL; //(void *)(new TestNsqMsgContext());
+       static char address[128];
+
+       if (loop == NULL) {
+               LM_ERR("cannot get libev loop\n");
+       }
+       set_non_blocking(cmd_pipe);
+
+       LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
+       // setup the reader
+       rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, nsq_message_handler);
+
+       if (consumer_use_nsqd == 0) {
+               snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
+               nsq_reader_add_nsqlookupd_endpoint(rdr, address, lookupd_port);
+       } else {
+               snprintf(address, 128, "%.*s", nsqd_address.len, nsqd_address.s);
+               nsq_reader_connect_to_nsqd(rdr, address, nsqd_port);
+       }
+
+       nsq_run(loop);
+       return 0;
+}
+
+/**
+ * @brief Initialize async module children
+ */
+static int mod_child_init(int rank)
+{
+       int pid;
+       int i;
+       int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+
+       fire_init_event(rank);
+
+       if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
+               return 0;
+
+       if (rank==PROC_MAIN) {
+               nsq_topic_channel_t *tc;
+
+               tc = tc_list;
+               if (tc == NULL) {
+                       LM_ERR("topic and channel not set, using defaults\n");
+                       for(i = 0; i < workers; i++) {
+                               pid=fork_process(i+1, "NSQ Consumer Worker", 1);
+                               if (pid<0)
+                                       return -1; /* error */
+                               if (pid==0){
+                                       close(nsq_worker_pipes_fds[i*2+1]);
+                                       return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL));
+                               }
+                       }
+               } else {
+                       while (tc) {
+                               for(i = 0; i < workers; i++) {
+                                       pid=fork_process(i+1, "NSQ Consumer Worker", 1);
+                                       if (pid<0)
+                                               return -1; /* error */
+                                       if (pid==0){
+                                               close(nsq_worker_pipes_fds[i*2+1]);
+                                               return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel));
+                                       }
+                               }
+                               tc = tc->next;
+                       }
+               }
+
+               return 0;
+       }
+
+       if (dbn_pua_mode == 1) {
+               if (nsq_pa_dbf.init == 0) {
+                       LM_CRIT("child_init: database not bound\n");
+                       return -1;
+               }
+               nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
+               if (!nsq_pa_db) {
+                       LM_ERR("child %d: unsuccessful connecting to database\n", rank);
+                       return -1;
+               }
+
+               if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
+                       LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank);
+                       return -1;
+               }
+               LM_DBG("child %d: Database connection opened successfully\n", rank);
+       }
+
+       return 0;
+}
+
+
+/**
+ * destroy module function
+ */
+static void mod_destroy(void) {
+       free_tc_list(tc_list);
+       shm_free(nsq_worker_pipes_fds);
+       shm_free(nsq_worker_pipes);
+}
diff --git a/modules/nsq/nsq_mod.h b/modules/nsq/nsq_mod.h
new file mode 100644 (file)
index 0000000..03c2047
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef __NSQ_MOD_H_
+#define __NSQ_MOD_H_
+
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "../../cfg/cfg_struct.h"
+#include "../../lib/srdb1/db.h"
+#include "nsq_reader.h"
+#include "nsq_trans.h"
+#include "nsq_pua.h"
+
+#define DBN_DEFAULT_NO_WORKERS 4
+#define LOOKUPD_ADDRESS "127.0.0.1"
+#define CONSUMER_EVENT_KEY "Event-Category"
+#define CONSUMER_EVENT_SUB_KEY "Event-Name"
+#define DEFAULT_CHANNEL "Kamailio-Channel"
+#define DEFAULT_TOPIC "Kamailio-Topic"
+#define NSQD_ADDRESS "127.0.0.1"
+#define PRESENTITY_TABLE "presentity"
+
+typedef struct nsq_topic_channel
+{
+       char *topic;
+       char *channel;
+       struct nsq_topic_channel *next;
+} nsq_topic_channel_t;
+
+
+int nsq_workers = 1;
+int nsq_max_in_flight = 1;
+int consumer_use_nsqd = 0;
+str nsq_lookupd_address = str_init(LOOKUPD_ADDRESS);
+int lookupd_port = 4161;
+str nsq_event_key = str_init(CONSUMER_EVENT_KEY);
+str nsq_event_sub_key = str_init(CONSUMER_EVENT_SUB_KEY);
+str nsqd_address = str_init(NSQD_ADDRESS);
+int nsqd_port = 4150;
+int dbn_pua_mode = 1;
+int dbn_include_entity = 1;
+
+nsq_topic_channel_t *tc_list = NULL;
+str nsq_json_escape_str = str_init("%");
+char nsq_json_escape_char = '%';
+
+int nsq_topic_channel_counter = 0;
+int dbn_consumer_workers = DBN_DEFAULT_NO_WORKERS;
+int startup_time = 0;
+int *nsq_worker_pipes_fds = NULL;
+int *nsq_worker_pipes = NULL;
+int nsq_cmd_pipe = 0;
+int nsq_cmd_pipe_fds[2] = {-1,-1};
+
+/* database connection */
+db1_con_t *nsq_pa_db = NULL;
+db_func_t nsq_pa_dbf;
+str nsq_presentity_table = str_init(PRESENTITY_TABLE);
+str nsq_db_url = {NULL, 0};
+
+static int mod_init(void);
+static int mod_child_init(int);
+static int nsq_add_topic_channel(modparam_t type, void* val);
+static void free_tc_list(nsq_topic_channel_t *tc_list);
+static void mod_destroy(void);
+
+int nsq_pv_get_event_payload(struct sip_msg*, pv_param_t*, pv_value_t*);
+
+#endif
diff --git a/modules/nsq/nsq_pua.c b/modules/nsq/nsq_pua.c
new file mode 100644 (file)
index 0000000..1fa8826
--- /dev/null
@@ -0,0 +1,521 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include "../presence/bind_presence.h"
+#include "defs.h"
+#include "nsq_json.h"
+#include "nsq_pua.h"
+
+extern db1_con_t *nsq_pa_db;
+extern db_func_t nsq_pa_dbf;
+extern str nsq_presentity_table;
+extern str nsq_db_url;
+
+extern int dbn_include_entity;
+extern int dbn_pua_mode;
+
+str str_event_message_summary = str_init("message-summary");
+str str_event_dialog = str_init("dialog");
+str str_event_presence = str_init("presence");
+
+str str_username_col = str_init("username");
+str str_domain_col = str_init("domain");
+str str_body_col = str_init("body");
+str str_expires_col = str_init("expires");
+str str_received_time_col = str_init("received_time");
+str str_presentity_uri_col = str_init("presentity_uri");
+str str_priority_col = str_init("priority");
+
+str str_event_col = str_init("event");
+str str_contact_col = str_init("contact");
+str str_callid_col = str_init("callid");
+str str_from_tag_col = str_init("from_tag");
+str str_to_tag_col = str_init("to_tag");
+str str_etag_col = str_init("etag");
+str str_sender_col = str_init("sender");
+
+str str_presence_note_busy = str_init("Busy");
+str str_presence_note_otp = str_init("On the Phone");
+str str_presence_note_idle = str_init("Idle");
+str str_presence_note_offline = str_init("Offline");
+str str_presence_act_busy = str_init("<rpid:busy/>");
+str str_presence_act_otp = str_init("<rpid:on-the-phone/>");
+str str_presence_status_offline = str_init("closed");
+str str_presence_status_online = str_init("open");
+
+str str_null_string = str_init("NULL");
+
+int nsq_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* sender, str* body, int expires, int reset)
+{
+       db_key_t query_cols[13];
+       db_op_t  query_ops[13];
+       db_val_t query_vals[13];
+       int n_query_cols = 0;
+       int ret = -1;
+       int use_replace = 1;
+
+       query_cols[n_query_cols] = &str_event_col;
+       query_ops[n_query_cols] = OP_EQ;
+       query_vals[n_query_cols].type = DB1_STR;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *event;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_domain_col;
+       query_ops[n_query_cols] = OP_EQ;
+       query_vals[n_query_cols].type = DB1_STR;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *realm;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_username_col;
+       query_ops[n_query_cols] = OP_EQ;
+       query_vals[n_query_cols].type = DB1_STR;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *user;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_etag_col;
+       query_ops[n_query_cols] = OP_EQ;
+       query_vals[n_query_cols].type = DB1_STR;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *etag;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_sender_col;
+       query_vals[n_query_cols].type = DB1_STR;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *sender;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_body_col;
+       query_vals[n_query_cols].type = DB1_BLOB;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.str_val = *body;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_received_time_col;
+       query_vals[n_query_cols].type = DB1_INT;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.int_val = (int)time(NULL);
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_expires_col;
+       query_vals[n_query_cols].type = DB1_INT;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.int_val = expires;
+       n_query_cols++;
+
+       query_cols[n_query_cols] = &str_priority_col;
+       query_vals[n_query_cols].type = DB1_INT;
+       query_vals[n_query_cols].nul = 0;
+       query_vals[n_query_cols].val.int_val = 0;
+       n_query_cols++;
+
+       if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
+               LM_ERR("unsuccessful use_table [%.*s]\n", nsq_presentity_table.len, nsq_presentity_table.s);
+               goto error;
+       }
+
+       if (nsq_pa_dbf.replace == NULL || reset > 0) {
+               use_replace = 0;
+               LM_DBG("using delete/insert instead of replace\n");
+       }
+
+       if (nsq_pa_dbf.start_transaction) {
+               if (nsq_pa_dbf.start_transaction(nsq_pa_db, DB_LOCKING_WRITE) < 0) {
+                       LM_ERR("in start_transaction\n");
+                       goto error;
+               }
+       }
+
+       if (use_replace) {
+               if (nsq_pa_dbf.replace(nsq_pa_db, query_cols, query_vals, n_query_cols, 4, 0) < 0) {
+                       LM_ERR("replacing record in database\n");
+                       if (nsq_pa_dbf.abort_transaction) {
+                               if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
+                                       LM_ERR("in abort_transaction\n");
+                               }
+                       }
+                       goto error;
+               }
+       } else {
+               if (nsq_pa_dbf.delete(nsq_pa_db, query_cols, query_ops, query_vals, 4-reset) < 0) {
+                       LM_ERR("deleting record in database\n");
+                       if (nsq_pa_dbf.abort_transaction) {
+                               if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0)
+                                       LM_ERR("in abort_transaction\n");
+                       }
+                       goto error;
+               }
+               if (nsq_pa_dbf.insert(nsq_pa_db, query_cols, query_vals, n_query_cols) < 0) {
+                       LM_ERR("replacing record in database\n");
+                       if (nsq_pa_dbf.abort_transaction) {
+                               if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
+                                       LM_ERR("in abort_transaction\n");
+                               }
+                       }
+                       goto error;
+               }
+       }
+
+       if (nsq_pa_dbf.end_transaction) {
+               if (nsq_pa_dbf.end_transaction(nsq_pa_db) < 0) {
+                       LM_ERR("in end_transaction\n");
+                       goto error;
+               }
+       }
+
+error:
+
+       return ret;
+}
+
+int nsq_pua_publish_presence_to_presentity(struct json_object *json_obj) {
+       int ret = 1;
+       str from = { 0, 0 }, to = { 0, 0 };
+       str from_user = { 0, 0 }, to_user = { 0, 0 };
+       str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+       str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+       str state = { 0, 0 };
+       str direction = { 0, 0 };
+       str event = str_init("presence");
+       str presence_body = { 0, 0 };
+       str activity = str_init("");
+       str note = str_init("Available");
+       str status = str_presence_status_online;
+       int expires = 0;
+
+       char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE);
+       if (body == NULL) {
+               LM_ERR("Error allocating buffer for publish\n");
+               ret = -1;
+               goto error;
+       }
+
+       json_extract_field(BLF_JSON_FROM, from);
+       json_extract_field(BLF_JSON_FROM_USER, from_user);
+       json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+       json_extract_field(BLF_JSON_TO, to);
+       json_extract_field(BLF_JSON_TO_USER, to_user);
+       json_extract_field(BLF_JSON_TO_REALM, to_realm);
+       json_extract_field(BLF_JSON_CALLID, callid);
+       json_extract_field(BLF_JSON_FROMTAG, fromtag);
+       json_extract_field(BLF_JSON_TOTAG, totag);
+       json_extract_field(BLF_JSON_DIRECTION, direction);
+       json_extract_field(BLF_JSON_STATE, state);
+
+       struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+       if (ExpiresObj != NULL) {
+               expires = json_object_get_int(ExpiresObj);
+               if (expires > 0)
+                       expires += (int)time(NULL);
+       }
+
+       if (!from_user.len || !to_user.len || !state.len) {
+               LM_ERR("missing one of From / To / State\n");
+               goto error;
+       }
+
+       if (!strcmp(state.s, "early")) {
+               note = str_presence_note_busy;
+               activity = str_presence_act_busy;
+
+       } else if (!strcmp(state.s, "confirmed")) {
+               note = str_presence_note_otp;
+               activity = str_presence_act_otp;
+
+       } else if (!strcmp(state.s, "offline")) {
+               note = str_presence_note_offline;
+               status = str_presence_status_offline;
+
+       }; // else {
+       //      note = str_presence_note_idle;
+       //}
+
+
+       sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s);
+
+       presence_body.s = body;
+       presence_body.len = strlen(body);
+
+       if (dbn_pua_mode == 1) {
+               nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &presence_body, expires, 1);
+       }
+
+ error:
+
+ if (body)
+         pkg_free(body);
+
+ return ret;
+
+}
+
+int nsq_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
+       int ret = 1;
+       str event = str_init("message-summary");
+       str from = { 0, 0 }, to = { 0, 0 };
+       str from_user = { 0, 0 }, to_user = { 0, 0 };
+       str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+       str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+       str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 },
+               mwi_new = { 0, 0 }, mwi_saved = { 0, 0 },
+               mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 },
+               mwi_account = { 0, 0 }, mwi_body = { 0, 0 };
+       int expires = 0;
+
+       char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE);
+       if (body == NULL) {
+               LM_ERR("Error allocating buffer for publish\n");
+               ret = -1;
+               goto error;
+       }
+
+       json_extract_field(BLF_JSON_FROM, from);
+       json_extract_field(BLF_JSON_FROM_USER, from_user);
+       json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+       json_extract_field(BLF_JSON_TO, to);
+       json_extract_field(BLF_JSON_TO_USER, to_user);
+       json_extract_field(BLF_JSON_TO_REALM, to_realm);
+       json_extract_field(BLF_JSON_CALLID, callid);
+       json_extract_field(BLF_JSON_FROMTAG, fromtag);
+       json_extract_field(BLF_JSON_TOTAG, totag);
+
+       json_extract_field(MWI_JSON_TO, mwi_user);
+       json_extract_field(MWI_JSON_WAITING, mwi_waiting);
+       json_extract_field(MWI_JSON_NEW, mwi_new);
+       json_extract_field(MWI_JSON_SAVED, mwi_saved);
+       json_extract_field(MWI_JSON_URGENT, mwi_urgent);
+       json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved);
+       json_extract_field(MWI_JSON_ACCOUNT, mwi_account);
+
+       struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+       if (ExpiresObj != NULL) {
+               expires = json_object_get_int(ExpiresObj);
+               if (expires > 0)
+                       expires += (int)time(NULL);
+       }
+
+       sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s,
+           mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s,
+           mwi_saved.len, mwi_saved.s, mwi_urgent.len, mwi_urgent.s,
+           mwi_urgent_saved.len, mwi_urgent_saved.s);
+
+       mwi_body.s = body;
+       mwi_body.len = strlen(body);
+
+       if (dbn_pua_mode == 1) {
+               nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &mwi_body, expires, 1);
+       }
+
+ error:
+
+   if (body)
+         pkg_free(body);
+
+
+   return ret;
+}
+
+
+int nsq_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
+       int ret = 1;
+       str from = { 0, 0 }, to = { 0, 0 }, pres = {0, 0};
+       str from_user = { 0, 0 }, to_user = { 0, 0 }, pres_user = { 0, 0 };
+       str from_realm = { 0, 0 }, to_realm = { 0, 0 }, pres_realm = { 0, 0 };
+       str from_uri = { 0, 0 }, to_uri = { 0, 0 };
+       str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+       str state = { 0, 0 };
+       str direction = { 0, 0 };
+       char sender_buf[1024];
+       str sender = {0, 0};
+       str dialoginfo_body = {0 , 0};
+       int expires = 0;
+       str event = str_init("dialog");
+       int reset = 0;
+       char to_tag_buffer[100];
+       char from_tag_buffer[100];
+
+       char *body = (char *)pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE);
+       if (body == NULL) {
+               LM_ERR("Error allocating buffer for publish\n");
+               ret = -1;
+               goto error;
+       }
+
+
+       json_extract_field(BLF_JSON_PRES, pres);
+       json_extract_field(BLF_JSON_PRES_USER, pres_user);
+       json_extract_field(BLF_JSON_PRES_REALM, pres_realm);
+       json_extract_field(BLF_JSON_FROM, from);
+       json_extract_field(BLF_JSON_FROM_USER, from_user);
+       json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+       json_extract_field(BLF_JSON_FROM_URI, from_uri);
+       json_extract_field(BLF_JSON_TO, to);
+       json_extract_field(BLF_JSON_TO_USER, to_user);
+       json_extract_field(BLF_JSON_TO_REALM, to_realm);
+       json_extract_field(BLF_JSON_TO_URI, to_uri);
+       json_extract_field(BLF_JSON_CALLID, callid);
+       json_extract_field(BLF_JSON_FROMTAG, fromtag);
+       json_extract_field(BLF_JSON_TOTAG, totag);
+       json_extract_field(BLF_JSON_DIRECTION, direction);
+       json_extract_field(BLF_JSON_STATE, state);
+
+       struct json_object *ExpiresObj =  nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+       if (ExpiresObj != NULL) {
+               expires = json_object_get_int(ExpiresObj);
+               if (expires > 0)
+                       expires += (int)time(NULL);
+       }
+
+       ExpiresObj =  nsq_json_get_object(json_obj, "Flush-Level");
+       if (ExpiresObj != NULL) {
+               reset = json_object_get_int(ExpiresObj);
+       }
+
+       if (!from.len || !to.len || !state.len) {
+               LM_ERR("missing one of From / To / State\n");
+               goto error;
+       }
+
+       if (!pres.len || !pres_user.len || !pres_realm.len) {
+               pres = from;
+               pres_user = from_user;
+               pres_realm = from_realm;
+       }
+
+       if (!from_uri.len)
+               from_uri = from;
+
+       if (!to_uri.len)
+               to_uri = to;
+
+       if (fromtag.len > 0) {
+               fromtag.len = sprintf(from_tag_buffer, LOCAL_TAG, fromtag.len, fromtag.s);
+               fromtag.s = from_tag_buffer;
+       }
+
+       if (totag.len > 0) {
+               totag.len = sprintf(to_tag_buffer, REMOTE_TAG, totag.len, totag.s);
+               totag.s = to_tag_buffer;
+       }
+
+       if (callid.len) {
+
+               if (dbn_include_entity) {
+               sprintf(body, DIALOGINFO_BODY,
+                               pres.len, pres.s,
+                               callid.len, callid.s,
+                               callid.len, callid.s,
+                               fromtag.len, fromtag.s,
+                               totag.len, totag.s,
+                               direction.len, direction.s,
+                               state.len, state.s,
+                               from_user.len, from_user.s,
+                               from.len, from.s,
+                               from_uri.len, from_uri.s,
+                               to_user.len, to_user.s,
+                               to.len, to.s,
+                               to_uri.len, to_uri.s
+                               );
+               } else {
+
+               sprintf(body, DIALOGINFO_BODY_2,
+                               pres.len, pres.s,
+                               callid.len, callid.s,
+                               callid.len, callid.s,
+                               fromtag.len, fromtag.s,
+                               totag.len, totag.s,
+                               direction.len, direction.s,
+                               state.len, state.s,
+                               from_user.len, from_user.s,
+                               from.len, from.s,
+                               to_user.len, to_user.s,
+                               to.len, to.s
+                               );
+               }
+
+       } else {
+               sprintf(body, DIALOGINFO_EMPTY_BODY, pres.len, pres.s);
+       }
+
+
+       sprintf(sender_buf, "sip:%s", callid.s);
+       sender.s = sender_buf;
+       sender.len = strlen(sender_buf);
+
+       dialoginfo_body.s = body;
+       dialoginfo_body.len = strlen(body);
+
+       if (dbn_pua_mode == 1) {
+               nsq_pua_update_presentity(&event, &pres_realm, &pres_user, &callid, &sender, &dialoginfo_body, expires, reset);
+       }
+
+ error:
+
+   if (body)
+         pkg_free(body);
+
+
+ return ret;
+}
+
+
+int nsq_pua_publish(struct sip_msg* msg, char *json) {
+       str event_name = { 0, 0 }, event_package = { 0, 0 };
+       struct json_object *json_obj = NULL;
+       int ret = 1;
+
+       if (dbn_pua_mode != 1) {
+               LM_ERR("pua_mode must be 1 to publish\n");
+               ret = -1;
+               goto error;
+       }
+
+       /* extract info from json and construct xml */
+       json_obj = nsq_json_parse(json);
+       if (json_obj == NULL) {
+               ret = -1;
+               goto error;
+       }
+
+       json_extract_field(BLF_JSON_EVENT_NAME, event_name);
+
+       if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) {
+               json_extract_field(BLF_JSON_EVENT_PKG, event_package);
+               if (event_package.len == str_event_dialog.len
+                               && strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) {
+                       ret = nsq_pua_publish_dialoginfo_to_presentity(json_obj);
+               } else if (event_package.len == str_event_message_summary.len
+                               && strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) {
+                       ret = nsq_pua_publish_mwi_to_presentity(json_obj);
+               } else if (event_package.len == str_event_presence.len
+                               && strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) {
+                       ret = nsq_pua_publish_presence_to_presentity(json_obj);
+               }
+       }
+
+error:
+       if (json_obj)
+               json_object_put(json_obj);
+
+       return ret;
+}
diff --git a/modules/nsq/nsq_pua.h b/modules/nsq/nsq_pua.h
new file mode 100644 (file)
index 0000000..067b029
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef __NSQ_PUA_H_
+#define __NSQ_PUA_H_
+
+
+int nsq_pua_publish(struct sip_msg* msg, char *json);
+
+
+#endif
diff --git a/modules/nsq/nsq_reader.c b/modules/nsq/nsq_reader.c
new file mode 100644 (file)
index 0000000..d35b700
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA     02111-1307      USA
+ *
+ */
+
+#include "nsq_reader.h"
+
+char *eventData = NULL;
+
+typedef struct json_object *json_obj_ptr;
+
+int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
+{
+       return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData);
+}
+
+int nsq_consumer_fire_event(char *routename)
+{
+       struct sip_msg *fmsg;
+       struct run_act_ctx ctx;
+       int rtb, rt;
+
+       LM_DBG("searching event_route[%s]\n", routename);
+       rt = route_get(&event_rt, routename);
+       if (rt < 0 || event_rt.rlist[rt] == NULL) {
+               LM_DBG("route %s does not exist\n", routename);
+               return -2;
+       }
+       LM_DBG("executing event_route[%s] (%d)\n", routename, rt);
+       if (faked_msg_init()<0) {
+               return -2;
+       }
+       fmsg = faked_msg_next();
+       rtb = get_route_type();
+       set_route_type(REQUEST_ROUTE);
+       init_run_actions_ctx(&ctx);
+       run_top_route(event_rt.rlist[rt], fmsg, 0);
+       set_route_type(rtb);
+
+       return 0;
+}
+
+int nsq_consumer_event(char *payload, char *channel, char *topic)
+{
+       json_obj_ptr json_obj = NULL;
+       int ret = 0;
+       str ev_name = {0, 0}, ev_category = {0, 0};
+       char *k = NULL;
+       char buffer[512];
+       char *p;
+
+       eventData = payload;
+
+       json_obj = nsq_json_parse(payload);
+       if (json_obj == NULL) {
+               return 0;
+       }
+
+       k = pkg_malloc(nsq_event_key.len+1);
+       memcpy(k, nsq_event_key.s, nsq_event_key.len);
+       k[nsq_event_key.len] = '\0';
+       json_extract_field(k, ev_category);
+       pkg_free(k);
+
+       k = pkg_malloc(nsq_event_sub_key.len+1);
+       memcpy(k, nsq_event_sub_key.s, nsq_event_sub_key.len);
+       k[nsq_event_sub_key.len] = '\0';
+       json_extract_field(k, ev_name);
+       pkg_free(k);
+
+       sprintf(buffer, "nsq:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
+       for (p=buffer ; *p; ++p) *p = tolower(*p);
+       for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+       if (nsq_consumer_fire_event(buffer) != 0) {
+               sprintf(buffer, "nsq:consumer-event-%.*s", ev_category.len, ev_category.s);
+               for (p=buffer ; *p; ++p) *p = tolower(*p);
+               for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+               if (nsq_consumer_fire_event(buffer) != 0) {
+                       sprintf(buffer, "nsq:consumer-event-%.*s-%.*s", nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len, nsq_event_sub_key.s);
+                       for (p=buffer ; *p; ++p) *p = tolower(*p);
+                       for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+                       if (nsq_consumer_fire_event(buffer) != 0) {
+                               sprintf(buffer, "nsq:consumer-event-%.*s", nsq_event_key.len, nsq_event_key.s);
+                               for (p=buffer ; *p; ++p) *p = tolower(*p);
+                               for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+                               if (nsq_consumer_fire_event(buffer) != 0) {
+                                       sprintf(buffer, "nsq:consumer-event");
+                                       if (nsq_consumer_fire_event(buffer) != 0) {
+                                               LM_ERR("nsq:consumer-event not found");
+                                       }
+                               }
+                       }
+               }
+       }
+
+       if(json_obj)
+               json_object_put(json_obj);
+
+       eventData = NULL;
+
+       return ret;
+}
+
+void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx)
+{
+       int ret = 0;
+
+       char *payload = (char*)shm_malloc(msg->body_length + 1);
+       if (!payload) {
+               LM_ERR("error allocating shared memory for payload");
+       }
+       strncpy(payload, msg->body, msg->body_length);
+       payload[msg->body_length] = 0;
+
+       ret = nsq_consumer_event(payload, rdr->channel, rdr->topic);
+
+       buffer_reset(conn->command_buf);
+
+       if (ret < 0) {
+               nsq_requeue(conn->command_buf, msg->id, 100);
+       } else {
+               nsq_finish(conn->command_buf, msg->id);
+       }
+       buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+       buffer_reset(conn->command_buf);
+       nsq_ready(conn->command_buf, rdr->max_in_flight);
+       buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+       free_nsq_message(msg);
+       shm_free(payload);
+}
diff --git a/modules/nsq/nsq_reader.h b/modules/nsq/nsq_reader.h
new file mode 100644 (file)
index 0000000..ec78432
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef __NSQ_READER_H_
+#define __NSQ_READER_H_
+
+#include <json.h>
+
+#include "../../sr_module.h"
+#include "../../fmsg.h"
+#include "nsq.h"
+#include "nsq_json.h"
+
+
+int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
+int nsq_consumer_fire_event(char *routename);
+int nsq_consumer_event(char *payload, char *channel, char *topic);
+
+void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx);
+
+#endif /* __NSQ_READER_H_ */
diff --git a/modules/nsq/nsq_trans.c b/modules/nsq/nsq_trans.c
new file mode 100644 (file)
index 0000000..cc3d84a
--- /dev/null
@@ -0,0 +1,450 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+/*! \file
+ * \brief Support for transformations
+ */
+
+#include "../../trim.h"
+#include "../../mod_fix.h"
+
+#include "nsq_trans.h"
+#include "nsq_json.h"
+
+
+/*! transformation buffer size */
+#define NSQ_TR_BUFFER_SIZE 65536
+#define NSQ_TR_BUFFER_SLOTS    4
+
+/*! transformation buffer */
+static char **_nsq_tr_buffer_list = NULL;
+
+static char *_nsq_tr_buffer = NULL;
+
+static int _nsq_tr_buffer_idx = 0;
+
+#define NSQ_TR_ALLOC_PARSE_SIZE        2048
+
+static pv_spec_t**  _nsq_parse_specs  = NULL;
+static tr_param_t** _nsq_parse_params = NULL;
+static int _nsq_tr_parse_spec = 0;
+static int _nsq_tr_parse_params = 0;
+
+
+/*!
+ *
+ */
+int nsq_tr_init_buffers(void)
+{
+       int i;
+
+       _nsq_tr_buffer_list = (char**)malloc(NSQ_TR_BUFFER_SLOTS * sizeof(char*));
+
+       if (_nsq_tr_buffer_list==NULL)
+               return -1;
+       for (i=0; i<NSQ_TR_BUFFER_SLOTS; i++) {
+               _nsq_tr_buffer_list[i] = (char*)malloc(NSQ_TR_BUFFER_SIZE);
+               if(_nsq_tr_buffer_list[i]==NULL)
+                       return -1;
+       }
+
+       _nsq_parse_specs = (pv_spec_t**)malloc(NSQ_TR_ALLOC_PARSE_SIZE * sizeof(pv_spec_t*));
+       for (i=0; i < NSQ_TR_ALLOC_PARSE_SIZE; i++)
+               _nsq_parse_specs[i] = NULL;
+
+       _nsq_parse_params = (tr_param_t**)malloc(NSQ_TR_ALLOC_PARSE_SIZE * sizeof(tr_param_t*));
+       for (i=0; i < NSQ_TR_ALLOC_PARSE_SIZE; i++)
+               _nsq_parse_params[i] = NULL;
+
+       return 0;
+}
+
+void nsq_tr_clear_buffers(void)
+{
+       int i;
+       if (_nsq_tr_buffer_list != NULL) {
+               for (i=0; i<NSQ_TR_BUFFER_SLOTS; i++) {
+                       if (_nsq_tr_buffer_list[i] != NULL) {
+                               free(_nsq_tr_buffer_list[i]);
+                               _nsq_tr_buffer_list[i] = NULL;
+                       }
+               }
+               free(_nsq_tr_buffer_list);
+               _nsq_tr_buffer_list = NULL;
+       }
+
+       if (_nsq_parse_specs != NULL) {
+               for (i=0; i<NSQ_TR_ALLOC_PARSE_SIZE; i++) {
+                       if (_nsq_parse_specs[i] != NULL) {
+                               free(_nsq_parse_specs[i]);
+                               _nsq_parse_specs[i] = NULL;
+                       }
+               }
+               free(_nsq_parse_specs);
+               _nsq_parse_specs = NULL;
+       }
+
+       if (_nsq_parse_params != NULL) {
+               for (i=0; i<NSQ_TR_ALLOC_PARSE_SIZE; i++) {
+                       if (_nsq_parse_params[i] != NULL) {
+                               free(_nsq_parse_params[i]);
+                               _nsq_parse_params[i] = NULL;
+                       }
+               }
+               free(_nsq_parse_params);
+               _nsq_parse_params = NULL;
+       }
+
+}
+
+char *nsq_tr_set_crt_buffer(void)
+{
+       _nsq_tr_buffer = _nsq_tr_buffer_list[_nsq_tr_buffer_idx];
+       _nsq_tr_buffer_idx = (_nsq_tr_buffer_idx + 1) % NSQ_TR_BUFFER_SLOTS;
+       return _nsq_tr_buffer;
+}
+
+#define nsq_tr_string_clone_result do { \
+               if(val->rs.len> NSQ_TR_BUFFER_SIZE-1) { \
+                       LM_ERR("result is too big\n"); \
+                       return -1; \
+               } \
+               strncpy(_nsq_tr_buffer, val->rs.s, val->rs.len); \
+               val->rs.s = _nsq_tr_buffer; \
+       } while(0);
+
+void nsq_destroy_pv_value(pv_value_t *val)
+{
+       if (val->flags & PV_VAL_PKG)
+               pkg_free(val->rs.s);
+       else if (val->flags & PV_VAL_SHM)
+               shm_free(val->rs.s);
+       pkg_free(val);
+}
+
+void nsq_free_pv_value(pv_value_t *val )
+{
+       if (val->flags & PV_VAL_PKG)
+               pkg_free(val->rs.s);
+       else if (val->flags & PV_VAL_SHM)
+               shm_free(val->rs.s);
+}
+
+pv_value_t* nsq_alloc_pv_value() {
+       pv_value_t *v = (pv_value_t*) pkg_malloc(sizeof(pv_value_t));
+       if (v != NULL)
+               memset(v, 0, sizeof(pv_value_t));
+       return v;
+}
+
+#define KEY_SAFE(C)  ((C >= 'a' && C <= 'z') || \
+                                         (C >= 'A' && C <= 'Z') || \
+                                         (C >= '0' && C <= '9') || \
+                                         (C == '-' || C == '~'  || C == '_'))
+
+#define HI4(C) (C>>4)
+#define LO4(C) (C & 0x0F)
+
+#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
+
+char *nsq_util_encode(const str * key, char *dest) {
+       if ((key->len == 1) && (key->s[0] == '#' || key->s[0] == '*')) {
+               *dest++ = key->s[0];
+               return dest;
+       }
+       char *p, *end;
+       for (p = key->s, end = key->s + key->len; p < end; p++) {
+               if (KEY_SAFE(*p)) {
+                       *dest++ = *p;
+               } else if (*p == '.') {
+                       memcpy(dest, "\%2E", 3);
+                       dest += 3;
+               } else if (*p == ' ') {
+                       *dest++ = '+';
+               } else {
+                       *dest++ = '%';
+                       sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+                       dest += 2;
+               }
+       }
+       *dest = '\0';
+       return dest;
+}
+
+int nsq_encode_ex(str *unencoded, pv_value_p dst_val)
+{
+       char routing_key_buff[256];
+       memset(routing_key_buff,0, sizeof(routing_key_buff));
+       nsq_util_encode(unencoded, routing_key_buff);
+
+       int len = strlen(routing_key_buff);
+       dst_val->rs.s = pkg_malloc(len + 1);
+       memcpy(dst_val->rs.s, routing_key_buff, len);
+       dst_val->rs.s[len] = '\0';
+       dst_val->rs.len = len;
+       dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+
+       return 1;
+
+}
+
+/*!
+ * \brief Evaluate NSQ transformations
+ * \param msg SIP message
+ * \param tp transformation
+ * \param subtype transformation type
+ * \param val pseudo-variable
+ * \return 0 on success, -1 on error
+ */
+int nsq_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val)
+{
+
+       str sv;
+       pv_value_t *pv;
+       pv_value_t v;
+       str v2 = {0,0};
+       void* v1 = NULL;
+
+       if (val==NULL || (val->flags&PV_VAL_NULL))
+               return -1;
+
+
+       nsq_tr_set_crt_buffer();
+
+       switch (subtype) {
+               case TR_NSQ_ENCODE:
+                       if (!(val->flags&PV_VAL_STR))
+                               return -1;
+
+                       pv = nsq_alloc_pv_value();
+                       if (pv == NULL) {
+                               LM_ERR("NSQ encode transform : no more private memory\n");
+                               return -1;
+                       }
+
+                       if (nsq_encode_ex(&val->rs, pv ) != 1) {
+                               LM_ERR("error encoding value\n");
+                               nsq_destroy_pv_value(pv);
+                               return -1;
+                       }
+
+                       strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
+                       _nsq_tr_buffer[pv->rs.len] = '\0';
+
+                       val->flags = PV_VAL_STR;
+                       val->ri = 0;
+                       val->rs.s = _nsq_tr_buffer;
+                       val->rs.len = pv->rs.len;
+
+                       nsq_destroy_pv_value(pv);
+                       nsq_free_pv_value(val);
+
+                       break;
+               case TR_NSQ_JSON:
+                       if (!(val->flags&PV_VAL_STR))
+                               return -1;
+
+                       if (tp == NULL) {
+                               LM_ERR("NSQ json transform invalid parameter\n");
+                               return -1;
+                       }
+
+                       pv = nsq_alloc_pv_value();
+                       if (pv == NULL) {
+                               LM_ERR("NSQ encode transform : no more private memory\n");
+                               return -1;
+                       }
+
+
+                       if (tp->type == TR_PARAM_STRING) {
+                               v1 = tp->v.s.s;
+                               if (fixup_spve_null(&v1, 1) != 0) {
+                                       LM_ERR("cannot get spve_value from TR_PARAM_STRING : %.*s\n", tp->v.s.len, tp->v.s.s);
+                                       return -1;
+                               }
+                               if (fixup_get_svalue(msg, (gparam_p)v1, &v2) != 0) {
+                                       LM_ERR("cannot get value from TR_PARAM_STRING\n");
+                                       fixup_free_spve_null(&v1, 1);
+                                       return -1;
+                               }
+                               fixup_free_spve_null(&v1, 1);
+                               sv = v2;
+                       } else {
+                               if (pv_get_spec_value(msg, (pv_spec_p)tp->v.data, &v)!=0
+                                               || (!(v.flags&PV_VAL_STR)) || v.rs.len<=0) {
+                                       LM_ERR("value cannot get spec value in json transform\n");
+                                       nsq_destroy_pv_value(pv);
+                                       return -1;
+                               }
+                               sv = v.rs;
+                       }
+
+
+                       if (nsq_json_get_field_ex(&val->rs, &sv, pv ) != 1) {
+                               LM_ERR("error getting json\n");
+                               nsq_destroy_pv_value(pv);
+                               return -1;
+                       }
+
+                       strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
+                       _nsq_tr_buffer[pv->rs.len] = '\0';
+
+                       val->flags = PV_VAL_STR;
+                       val->ri = 0;
+                       val->rs.s = _nsq_tr_buffer;
+                       val->rs.len = pv->rs.len;
+
+                       nsq_destroy_pv_value(pv);
+                       nsq_free_pv_value(val);
+
+                       break;
+
+               default:
+                       LM_ERR("unknown NSQ transformation subtype %d\n", subtype);
+                       return -1;
+       }
+       return 0;
+}
+
+#define _nsq_tr_parse_sparam(_p, _p0, _tp, _spec, _ps, _in, _s) \
+       while(is_in_str(_p, _in) && (*_p==' ' || *_p=='\t' || *_p=='\n')) _p++; \
+       if(*_p==PV_MARKER) \
+       { /* pseudo-variable */ \
+               _spec = (pv_spec_t*)malloc(sizeof(pv_spec_t)); \
+               if(_spec==NULL) \
+               { \
+                       LM_ERR("no more private memory!\n"); \
+                       goto error; \
+               } \
+               _s.s = _p; _s.len = _in->s + _in->len - _p; \
+               _p0 = pv_parse_spec(&_s, _spec); \
+               if(_p0==NULL) \
+               { \
+                       LM_ERR("invalid spec in substr transformation: %.*s!\n", \
+                               _in->len, _in->s); \
+                       goto error; \
+               } \
+               _p = _p0; \
+               _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
+               if(_tp==NULL) \
+               { \
+                       LM_ERR("no more private memory!\n"); \
+                       goto error; \
+               } \
+               memset(_tp, 0, sizeof(tr_param_t)); \
+               _tp->type = TR_PARAM_SPEC; \
+               _tp->v.data = (void*)_spec; \
+               _nsq_parse_specs[_nsq_tr_parse_spec++] = _spec; \
+               _nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
+       } else { /* string */ \
+               _ps = _p; \
+               while(is_in_str(_p, _in) && *_p!='\t' && *_p!='\n' \
+                               && *_p!=TR_PARAM_MARKER && *_p!=TR_RBRACKET) \
+                               _p++; \
+               if(*_p=='\0') \
+               { \
+                       LM_ERR("invalid param in transformation: %.*s!!\n", \
+                               _in->len, _in->s); \
+                       goto error; \
+               } \
+               _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
+               if(_tp==NULL) \
+               { \
+                       LM_ERR("no more private memory!\n"); \
+                       goto error; \
+               } \
+               memset(_tp, 0, sizeof(tr_param_t)); \
+               _tp->type = TR_PARAM_STRING; \
+               _tp->v.s.len = _p - _ps; \
+               _tp->v.s.s = (char*)malloc((tp->v.s.len+1)*sizeof(char)); \
+               strncpy(_tp->v.s.s, _ps, tp->v.s.len); \
+               _tp->v.s.s[tp->v.s.len] = '\0'; \
+               _nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
+       }
+
+
+/*!
+ * \brief Helper fuction to parse a NSQ transformation
+ * \param in parsed string
+ * \param t transformation
+ * \return pointer to the end of the transformation in the string - '}', null on error
+ */
+char* nsq_tr_parse(str* in, trans_t *t)
+{
+       char *p;
+       char *p0;
+       char *ps;
+       str name;
+       str s;
+       pv_spec_t *spec = NULL;
+       tr_param_t *tp = NULL;
+
+       if(in==NULL || t==NULL)
+               return NULL;
+
+       p = in->s;
+       name.s = in->s;
+       t->type = TR_NSQ;
+       t->trf = nsq_tr_eval;
+
+       /* find next token */
+       while(is_in_str(p, in) && *p!=TR_PARAM_MARKER && *p!=TR_RBRACKET) p++;
+       if (*p=='\0') {
+               LM_ERR("invalid transformation: %.*s\n",
+                               in->len, in->s);
+               goto error;
+       }
+       name.len = p - name.s;
+       trim(&name);
+
+       if (name.len==6 && strncasecmp(name.s, "encode", 6)==0) {
+               t->subtype = TR_NSQ_ENCODE;
+               goto done;
+       } else if (name.len==4 && strncasecmp(name.s, "json", 4)==0) {
+               t->subtype = TR_NSQ_JSON;
+               if (*p!=TR_PARAM_MARKER) {
+                       LM_ERR("invalid json transformation: %.*s!\n", in->len, in->s);
+                       goto error;
+               }
+               p++;
+               _nsq_tr_parse_sparam(p, p0, tp, spec, ps, in, s);
+               t->params = tp;
+               tp = 0;
+               while(*p && (*p==' ' || *p=='\t' || *p=='\n')) p++;
+               if (*p!=TR_RBRACKET) {
+                       LM_ERR("invalid json transformation: %.*s!!\n",
+                               in->len, in->s);
+                       goto error;
+               }
+               goto done;
+       }
+
+       LM_ERR("unknown NSQ transformation: %.*s/%.*s/%d!\n", in->len, in->s,
+                       name.len, name.s, name.len);
+error:
+       if(tp)
+               free(tp);
+       if(spec)
+               free(spec);
+       return NULL;
+done:
+       t->name = name;
+       return p;
+}
diff --git a/modules/nsq/nsq_trans.h b/modules/nsq/nsq_trans.h
new file mode 100644 (file)
index 0000000..ff40158
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+/*! \file
+ * \brief Transformations support
+ */
+
+#ifndef _NSQ_TRANS_H_
+#define _NSQ_TRANS_H_
+
+#include "../../pvar.h"
+
+
+enum _nsq_tr_type { TR_NONE=0, TR_NSQ };
+enum _nsq_tr_subtype { TR_NSQ_NONE=0, TR_NSQ_ENCODE, TR_NSQ_JSON };
+
+char *nsq_tr_parse(str *in, trans_t *tr);
+
+int nsq_tr_init_buffers(void);
+void nsq_tr_clear_buffers(void);
+
+
+#endif
diff --git a/modules/nsq/reader.c b/modules/nsq/reader.c
new file mode 100644 (file)
index 0000000..43ac23e
--- /dev/null
@@ -0,0 +1,172 @@
+#include "nsq.h"
+#include "utlist.h"
+#include "http.h"
+#include "../../dprint.h"
+
+extern int nsq_max_in_flight;
+
+static void nsq_reader_connect_cb(struct NSQDConnection *conn, void *arg)
+{
+    struct NSQReader *rdr = (struct NSQReader *)arg;
+
+    if (rdr->connect_callback) {
+        rdr->connect_callback(rdr, conn);
+    }
+
+    // subscribe
+    buffer_reset(conn->command_buf);
+    nsq_subscribe(conn->command_buf, rdr->topic, rdr->channel);
+    buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+    // send initial RDY
+    buffer_reset(conn->command_buf);
+    nsq_ready(conn->command_buf, rdr->max_in_flight);
+    buffered_socket_write_buffer(conn->bs, conn->command_buf);
+}
+
+static void nsq_reader_msg_cb(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg)
+{
+    struct NSQReader *rdr = (struct NSQReader *)arg;
+
+       //LM_ERR("nsq_reader_msg_cb()!\n");
+    if (rdr->msg_callback) {
+        msg->id[sizeof(msg->id)-1] = '\0';
+        rdr->msg_callback(rdr, conn, msg, rdr->ctx);
+    }
+}
+
+static void nsq_reader_close_cb(struct NSQDConnection *conn, void *arg)
+{
+    struct NSQReader *rdr = (struct NSQReader *)arg;
+
+       //LM_ERR("nsq_reader_close_cb()!\n");
+
+    if (rdr->close_callback) {
+        rdr->close_callback(rdr, conn);
+    }
+
+    LL_DELETE(rdr->conns, conn);
+
+    free_nsqd_connection(conn);
+}
+
+void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, void *arg);
+
+static void nsq_reader_lookupd_poll_cb(EV_P_ struct ev_timer *w, int revents)
+{
+    struct NSQReader *rdr = (struct NSQReader *)w->data;
+    struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+    struct HttpRequest *req;
+    int i, idx, count = 0;
+    char buf[256];
+
+       //LM_ERR("nsq_reader_lookupd_poll_cb()!\n");
+
+    LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+        count++;
+    }
+    if (count == 0)
+       idx = 0;
+    else
+       idx = rand() % count;
+
+
+    i = 0;
+    LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+        if (i++ == idx) {
+            sprintf(buf, "http://%s:%d/lookup?topic=%s", nsqlookupd_endpoint->address,
+                nsqlookupd_endpoint->port, rdr->topic);
+                       //LM_ERR("buf %s\n", buf);
+            req = new_http_request(buf, nsq_lookupd_request_cb, rdr, NULL);
+            http_client_get((struct HttpClient *)rdr->httpc, req);
+            break;
+        }
+    }
+
+    ev_timer_again(rdr->loop, &rdr->lookupd_poll_timer);
+}
+
+struct NSQReader* new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx,
+    void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+    void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+    void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx))
+{
+    struct NSQReader *rdr;
+
+    rdr = (struct NSQReader *)malloc(sizeof(struct NSQReader));
+    rdr->topic = strdup(topic);
+    rdr->channel = strdup(channel);
+    rdr->max_in_flight = nsq_max_in_flight;
+    rdr->connect_callback = connect_callback;
+    rdr->close_callback = close_callback;
+    rdr->msg_callback = msg_callback;
+    rdr->ctx = ctx;
+    rdr->conns = NULL;
+    rdr->lookupd = NULL;
+    rdr->loop = loop;
+
+    rdr->httpc = new_http_client(rdr->loop);
+
+       //LM_ERR("new_nsq_reader(), nsq_max_in_flight = %d!\n", nsq_max_in_flight);
+
+    // TODO: configurable interval
+    ev_timer_init(&rdr->lookupd_poll_timer, nsq_reader_lookupd_poll_cb, 0., 5.);
+    rdr->lookupd_poll_timer.data = rdr;
+    ev_timer_again(rdr->loop, &rdr->lookupd_poll_timer);
+
+    return rdr;
+}
+
+void free_nsq_reader(struct NSQReader *rdr)
+{
+    struct NSQDConnection *conn;
+    struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+
+       //LM_ERR("free_nsq_reader()!\n");
+
+    if (rdr) {
+        // TODO: this should probably trigger disconnections and then keep
+        // trying to clean up until everything upstream is finished
+        LL_FOREACH(rdr->conns, conn) {
+            nsqd_connection_disconnect(conn);
+        }
+        LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+            free_nsqlookupd_endpoint(nsqlookupd_endpoint);
+        }
+        free(rdr->topic);
+        free(rdr->channel);
+        free(rdr);
+    }
+}
+
+int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port)
+{
+    struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+
+       //LM_ERR("nsq_reader_add_nsqlookupd_endpoint(address = %s, port = %d)!\n", address, port);
+    nsqlookupd_endpoint = new_nsqlookupd_endpoint(address, port);
+    LL_APPEND(rdr->lookupd, nsqlookupd_endpoint);
+
+    return 1;
+}
+
+int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port)
+{
+    struct NSQDConnection *conn;
+    int rc;
+
+       //LM_ERR("nsq_reader_connect_to_nsqd()!\n");
+    conn = new_nsqd_connection(rdr->loop, address, port, nsq_reader_connect_cb, nsq_reader_close_cb, nsq_reader_msg_cb, rdr);
+    rc = nsqd_connection_connect(conn);
+    if (rc > 0) {
+        LL_APPEND(rdr->conns, conn);
+    }
+    return rc;
+}
+
+void nsq_run(struct ev_loop *loop)
+{
+       //LM_ERR("nsq_run()!\n");
+    srand(time(NULL));
+    ev_loop(loop, 0);
+}
diff --git a/modules/nsq/utlist.h b/modules/nsq/utlist.h
new file mode 100644 (file)
index 0000000..6bccec7
--- /dev/null
@@ -0,0 +1,728 @@
+/*
+Copyright (c) 2007-2013, Troy D. Hanson   http://troydhanson.github.com/uthash/
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef UTLIST_H
+#define UTLIST_H
+
+#define UTLIST_VERSION 1.9.8
+
+#include <assert.h>
+
+/* 
+ * This file contains macros to manipulate singly and doubly-linked lists.
+ *
+ * 1. LL_ macros:  singly-linked lists.
+ * 2. DL_ macros:  doubly-linked lists.
+ * 3. CDL_ macros: circular doubly-linked lists.
+ *
+ * To use singly-linked lists, your structure must have a "next" pointer.
+ * To use doubly-linked lists, your structure must "prev" and "next" pointers.
+ * Either way, the pointer to the head of the list must be initialized to NULL.
+ * 
+ * ----------------.EXAMPLE -------------------------
+ * struct item {
+ *      int id;
+ *      struct item *prev, *next;
+ * }
+ *
+ * struct item *list = NULL:
+ *
+ * int main() {
+ *      struct item *item;
+ *      ... allocate and populate item ...
+ *      DL_APPEND(list, item);
+ * }
+ * --------------------------------------------------
+ *
+ * For doubly-linked lists, the append and delete macros are O(1)
+ * For singly-linked lists, append and delete are O(n) but prepend is O(1)
+ * The sort macro is O(n log(n)) for all types of single/double/circular lists.
+ */
+
+/* These macros use decltype or the earlier __typeof GNU extension.
+   As decltype is only available in newer compilers (VS2010 or gcc 4.3+
+   when compiling c++ code), this code uses whatever method is needed
+   or, for VS2008 where neither is available, uses casting workarounds. */
+#ifdef _MSC_VER            /* MS compiler */
+#if _MSC_VER >= 1600 && defined(__cplusplus)  /* VS2010 or newer in C++ mode */
+#define LDECLTYPE(x) decltype(x)
+#else                     /* VS2008 or older (or VS2010 in C mode) */
+#define NO_DECLTYPE
+#define LDECLTYPE(x) char*
+#endif
+#else                      /* GNU, Sun and other compilers */
+#define LDECLTYPE(x) __typeof(x)
+#endif
+
+/* for VS2008 we use some workarounds to get around the lack of decltype,
+ * namely, we always reassign our tmp variable to the list head if we need
+ * to dereference its prev/next pointers, and save/restore the real head.*/
+#ifdef NO_DECLTYPE
+#define _SV(elt,list) _tmp = (char*)(list); {char **_alias = (char**)&(list); *_alias = (elt); }
+#define _NEXT(elt,list,next) ((char*)((list)->next))
+#define _NEXTASGN(elt,list,to,next) { char **_alias = (char**)&((list)->next); *_alias=(char*)(to); }
+/* #define _PREV(elt,list,prev) ((char*)((list)->prev)) */
+#define _PREVASGN(elt,list,to,prev) { char **_alias = (char**)&((list)->prev); *_alias=(char*)(to); }
+#define _RS(list) { char **_alias = (char**)&(list); *_alias=_tmp; }
+#define _CASTASGN(a,b) { char **_alias = (char**)&(a); *_alias=(char*)(b); }
+#else 
+#define _SV(elt,list)
+#define _NEXT(elt,list,next) ((elt)->next)
+#define _NEXTASGN(elt,list,to,next) ((elt)->next)=(to)
+/* #define _PREV(elt,list,prev) ((elt)->prev) */
+#define _PREVASGN(elt,list,to,prev) ((elt)->prev)=(to)
+#define _RS(list)
+#define _CASTASGN(a,b) (a)=(b)
+#endif
+
+/******************************************************************************
+ * The sort macro is an adaptation of Simon Tatham's O(n log(n)) mergesort    *
+ * Unwieldy variable names used here to avoid shadowing passed-in variables.  *
+ *****************************************************************************/
+#define LL_SORT(list, cmp)                                                                     \
+    LL_SORT2(list, cmp, next)
+
+#define LL_SORT2(list, cmp, next)                                                              \
+do {                                                                                           \
+  LDECLTYPE(list) _ls_p;                                                                       \
+  LDECLTYPE(list) _ls_q;                                                                       \
+  LDECLTYPE(list) _ls_e;                                                                       \
+  LDECLTYPE(list) _ls_tail;                                                                    \
+  int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping;                       \
+  if (list) {                                                                                  \
+    _ls_insize = 1;                                                                            \
+    _ls_looping = 1;                                                                           \
+    while (_ls_looping) {                                                                      \
+      _CASTASGN(_ls_p,list);                                                                   \
+      list = NULL;                                                                             \
+      _ls_tail = NULL;                                                                         \
+      _ls_nmerges = 0;                                                                         \
+      while (_ls_p) {                                                                          \
+        _ls_nmerges++;                                                                         \
+        _ls_q = _ls_p;                                                                         \
+        _ls_psize = 0;                                                                         \
+        for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) {                                         \
+          _ls_psize++;                                                                         \
+          _SV(_ls_q,list); _ls_q = _NEXT(_ls_q,list,next); _RS(list);                          \
+          if (!_ls_q) break;                                                                   \
+        }                                                                                      \
+        _ls_qsize = _ls_insize;                                                                \
+        while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) {                                    \
+          if (_ls_psize == 0) {                                                                \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+          } else if (_ls_qsize == 0 || !_ls_q) {                                               \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+          } else if (cmp(_ls_p,_ls_q) <= 0) {                                                  \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+          } else {                                                                             \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+          }                                                                                    \
+          if (_ls_tail) {                                                                      \
+            _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list);                \
+          } else {                                                                             \
+            _CASTASGN(list,_ls_e);                                                             \
+          }                                                                                    \
+          _ls_tail = _ls_e;                                                                    \
+        }                                                                                      \
+        _ls_p = _ls_q;                                                                         \
+      }                                                                                        \
+      if (_ls_tail) {                                                                          \
+        _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,NULL,next); _RS(list);                     \
+      }                                                                                        \
+      if (_ls_nmerges <= 1) {                                                                  \
+        _ls_looping=0;                                                                         \
+      }                                                                                        \
+      _ls_insize *= 2;                                                                         \
+    }                                                                                          \
+  }                                                                                            \
+} while (0)
+
+
+#define DL_SORT(list, cmp)                                                                     \
+    DL_SORT2(list, cmp, prev, next)
+
+#define DL_SORT2(list, cmp, prev, next)                                                        \
+do {                                                                                           \
+  LDECLTYPE(list) _ls_p;                                                                       \
+  LDECLTYPE(list) _ls_q;                                                                       \
+  LDECLTYPE(list) _ls_e;                                                                       \
+  LDECLTYPE(list) _ls_tail;                                                                    \
+  int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping;                       \
+  if (list) {                                                                                  \
+    _ls_insize = 1;                                                                            \
+    _ls_looping = 1;                                                                           \
+    while (_ls_looping) {                                                                      \
+      _CASTASGN(_ls_p,list);                                                                   \
+      list = NULL;                                                                             \
+      _ls_tail = NULL;                                                                         \
+      _ls_nmerges = 0;                                                                         \
+      while (_ls_p) {                                                                          \
+        _ls_nmerges++;                                                                         \
+        _ls_q = _ls_p;                                                                         \
+        _ls_psize = 0;                                                                         \
+        for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) {                                         \
+          _ls_psize++;                                                                         \
+          _SV(_ls_q,list); _ls_q = _NEXT(_ls_q,list,next); _RS(list);                          \
+          if (!_ls_q) break;                                                                   \
+        }                                                                                      \
+        _ls_qsize = _ls_insize;                                                                \
+        while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) {                                    \
+          if (_ls_psize == 0) {                                                                \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+          } else if (_ls_qsize == 0 || !_ls_q) {                                               \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+          } else if (cmp(_ls_p,_ls_q) <= 0) {                                                  \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+          } else {                                                                             \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+          }                                                                                    \
+          if (_ls_tail) {                                                                      \
+            _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list);                \
+          } else {                                                                             \
+            _CASTASGN(list,_ls_e);                                                             \
+          }                                                                                    \
+          _SV(_ls_e,list); _PREVASGN(_ls_e,list,_ls_tail,prev); _RS(list);                     \
+          _ls_tail = _ls_e;                                                                    \
+        }                                                                                      \
+        _ls_p = _ls_q;                                                                         \
+      }                                                                                        \
+      _CASTASGN(list->prev, _ls_tail);                                                         \
+      _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,NULL,next); _RS(list);                       \
+      if (_ls_nmerges <= 1) {                                                                  \
+        _ls_looping=0;                                                                         \
+      }                                                                                        \
+      _ls_insize *= 2;                                                                         \
+    }                                                                                          \
+  }                                                                                            \
+} while (0)
+
+#define CDL_SORT(list, cmp)                                                                    \
+    CDL_SORT2(list, cmp, prev, next)
+
+#define CDL_SORT2(list, cmp, prev, next)                                                       \
+do {                                                                                           \
+  LDECLTYPE(list) _ls_p;                                                                       \
+  LDECLTYPE(list) _ls_q;                                                                       \
+  LDECLTYPE(list) _ls_e;                                                                       \
+  LDECLTYPE(list) _ls_tail;                                                                    \
+  LDECLTYPE(list) _ls_oldhead;                                                                 \
+  LDECLTYPE(list) _tmp;                                                                        \
+  int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping;                       \
+  if (list) {                                                                                  \
+    _ls_insize = 1;                                                                            \
+    _ls_looping = 1;                                                                           \
+    while (_ls_looping) {                                                                      \
+      _CASTASGN(_ls_p,list);                                                                   \
+      _CASTASGN(_ls_oldhead,list);                                                             \
+      list = NULL;                                                                             \
+      _ls_tail = NULL;                                                                         \
+      _ls_nmerges = 0;                                                                         \
+      while (_ls_p) {                                                                          \
+        _ls_nmerges++;                                                                         \
+        _ls_q = _ls_p;                                                                         \
+        _ls_psize = 0;                                                                         \
+        for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) {                                         \
+          _ls_psize++;                                                                         \
+          _SV(_ls_q,list);                                                                     \
+          if (_NEXT(_ls_q,list,next) == _ls_oldhead) {                                         \
+            _ls_q = NULL;                                                                      \
+          } else {                                                                             \
+            _ls_q = _NEXT(_ls_q,list,next);                                                    \
+          }                                                                                    \
+          _RS(list);                                                                           \
+          if (!_ls_q) break;                                                                   \
+        }                                                                                      \
+        _ls_qsize = _ls_insize;                                                                \
+        while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) {                                    \
+          if (_ls_psize == 0) {                                                                \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+            if (_ls_q == _ls_oldhead) { _ls_q = NULL; }                                        \
+          } else if (_ls_qsize == 0 || !_ls_q) {                                               \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+            if (_ls_p == _ls_oldhead) { _ls_p = NULL; }                                        \
+          } else if (cmp(_ls_p,_ls_q) <= 0) {                                                  \
+            _ls_e = _ls_p; _SV(_ls_p,list); _ls_p =                                            \
+              _NEXT(_ls_p,list,next); _RS(list); _ls_psize--;                                  \
+            if (_ls_p == _ls_oldhead) { _ls_p = NULL; }                                        \
+          } else {                                                                             \
+            _ls_e = _ls_q; _SV(_ls_q,list); _ls_q =                                            \
+              _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--;                                  \
+            if (_ls_q == _ls_oldhead) { _ls_q = NULL; }                                        \
+          }                                                                                    \
+          if (_ls_tail) {                                                                      \
+            _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list);                \
+          } else {                                                                             \
+            _CASTASGN(list,_ls_e);                                                             \
+          }                                                                                    \
+          _SV(_ls_e,list); _PREVASGN(_ls_e,list,_ls_tail,prev); _RS(list);                     \
+          _ls_tail = _ls_e;                                                                    \
+        }                                                                                      \
+        _ls_p = _ls_q;                                                                         \
+      }                                                                                        \
+      _CASTASGN(list->prev,_ls_tail);                                                          \
+      _CASTASGN(_tmp,list);                                                                    \
+      _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_tmp,next); _RS(list);                       \
+      if (_ls_nmerges <= 1) {                                                                  \
+        _ls_looping=0;                                                                         \
+      }                                                                                        \
+      _ls_insize *= 2;                                                                         \
+    }                                                                                          \
+  }                                                                                            \
+} while (0)
+
+/******************************************************************************
+ * singly linked list macros (non-circular)                                   *
+ *****************************************************************************/
+#define LL_PREPEND(head,add)                                                                   \
+    LL_PREPEND2(head,add,next)
+
+#define LL_PREPEND2(head,add,next)                                                             \
+do {                                                                                           \
+  (add)->next = head;                                                                          \
+  head = add;                                                                                  \
+} while (0)
+
+#define LL_CONCAT(head1,head2)                                                                 \
+    LL_CONCAT2(head1,head2,next)
+
+#define LL_CONCAT2(head1,head2,next)                                                           \
+do {                                                                                           \
+  LDECLTYPE(head1) _tmp;                                                                       \
+  if (head1) {                                                                                 \
+    _tmp = head1;                                                                              \
+    while (_tmp->next) { _tmp = _tmp->next; }                                                  \
+    _tmp->next=(head2);                                                                        \
+  } else {                                                                                     \
+    (head1)=(head2);                                                                           \
+  }                                                                                            \
+} while (0)
+
+#define LL_APPEND(head,add)                                                                    \
+    LL_APPEND2(head,add,next)
+
+#define LL_APPEND2(head,add,next)                                                              \
+do {                                                                                           \
+  LDECLTYPE(head) _tmp;                                                                        \
+  (add)->next=NULL;                                                                            \
+  if (head) {                                                                                  \
+    _tmp = head;                                                                               \
+    while (_tmp->next) { _tmp = _tmp->next; }                                                  \
+    _tmp->next=(add);                                                                          \
+  } else {                                                                                     \
+    (head)=(add);                                                                              \
+  }                                                                                            \
+} while (0)
+
+#define LL_DELETE(head,del)                                                                    \
+    LL_DELETE2(head,del,next)
+
+#define LL_DELETE2(head,del,next)                                                              \
+do {                                                                                           \
+  LDECLTYPE(head) _tmp;                                                                        \
+  if ((head) == (del)) {                                                                       \
+    (head)=(head)->next;                                                                       \
+  } else {                                                                                     \
+    _tmp = head;                                                                               \
+    while (_tmp->next && (_tmp->next != (del))) {                                              \
+      _tmp = _tmp->next;                                                                       \
+    }                                                                                          \
+    if (_tmp->next) {                                                                          \
+      _tmp->next = ((del)->next);                                                              \
+    }                                                                                          \
+  }                                                                                            \
+} while (0)
+
+/* Here are VS2008 replacements for LL_APPEND and LL_DELETE */
+#define LL_APPEND_VS2008(head,add)                                                             \
+    LL_APPEND2_VS2008(head,add,next)
+
+#define LL_APPEND2_VS2008(head,add,next)                                                       \
+do {                                                                                           \
+  if (head) {                                                                                  \
+    (add)->next = head;     /* use add->next as a temp variable */                             \
+    while ((add)->next->next) { (add)->next = (add)->next->next; }                             \
+    (add)->next->next=(add);                                                                   \
+  } else {                                                                                     \
+    (head)=(add);                                                                              \
+  }                                                                                            \
+  (add)->next=NULL;                                                                            \
+} while (0)
+
+#define LL_DELETE_VS2008(head,del)                                                             \
+    LL_DELETE2_VS2008(head,del,next)
+
+#define LL_DELETE2_VS2008(head,del,next)                                                       \
+do {                                                                                           \
+  if ((head) == (del)) {                                                                       \
+    (head)=(head)->next;                                                                       \
+  } else {                                                                                     \
+    char *_tmp = (char*)(head);                                                                \
+    while ((head)->next && ((head)->next != (del))) {                                          \
+      head = (head)->next;                                                                     \
+    }                                                                                          \
+    if ((head)->next) {                                                                        \
+      (head)->next = ((del)->next);                                                            \
+    }                                                                                          \
+    {                                                                                          \
+      char **_head_alias = (char**)&(head);                                                    \
+      *_head_alias = _tmp;                                                                     \
+    }                                                                                          \
+  }                                                                                            \
+} while (0)
+#ifdef NO_DECLTYPE
+#undef LL_APPEND
+#define LL_APPEND LL_APPEND_VS2008
+#undef LL_DELETE
+#define LL_DELETE LL_DELETE_VS2008
+#undef LL_DELETE2
+#define LL_DELETE2_VS2008
+#undef LL_APPEND2
+#define LL_APPEND2 LL_APPEND2_VS2008
+#undef LL_CONCAT /* no LL_CONCAT_VS2008 */
+#undef DL_CONCAT /* no DL_CONCAT_VS2008 */
+#endif
+/* end VS2008 replacements */
+
+#define LL_FOREACH(head,el)                                                                    \
+    LL_FOREACH2(head,el,next)
+
+#define LL_FOREACH2(head,el,next)                                                              \
+    for(el=head;el;el=(el)->next)
+
+#define LL_FOREACH_SAFE(head,el,tmp)                                                           \
+    LL_FOREACH_SAFE2(head,el,tmp,next)
+
+#define LL_FOREACH_SAFE2(head,el,tmp,next)                                                     \
+  for((el)=(head);(el) && (tmp = (el)->next, 1); (el) = tmp)
+
+#define LL_SEARCH_SCALAR(head,out,field,val)                                                   \
+    LL_SEARCH_SCALAR2(head,out,field,val,next)
+
+#define LL_SEARCH_SCALAR2(head,out,field,val,next)                                             \
+do {                                                                                           \
+    LL_FOREACH2(head,out,next) {                                                               \
+      if ((out)->field == (val)) break;                                                        \
+    }                                                                                          \
+} while(0) 
+
+#define LL_SEARCH(head,out,elt,cmp)                                                            \
+    LL_SEARCH2(head,out,elt,cmp,next)
+
+#define LL_SEARCH2(head,out,elt,cmp,next)                                                      \
+do {                                                                                           \
+    LL_FOREACH2(head,out,next) {                                                               \
+      if ((cmp(out,elt))==0) break;                                                            \
+    }                                                                                          \
+} while(0) 
+
+#define LL_REPLACE_ELEM(head, el, add)                                                         \
+do {                                                                                           \
+ LDECLTYPE(head) _tmp;                                                                         \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ (add)->next = (el)->next;                                                                     \
+ if ((head) == (el)) {                                                                         \
+  (head) = (add);                                                                              \
+ } else {                                                                                      \
+  _tmp = head;                                                                                 \
+  while (_tmp->next && (_tmp->next != (el))) {                                                 \
+   _tmp = _tmp->next;                                                                          \
+  }                                                                                            \
+  if (_tmp->next) {                                                                            \
+    _tmp->next = (add);                                                                        \
+  }                                                                                            \
+ }                                                                                             \
+} while (0)
+
+#define LL_PREPEND_ELEM(head, el, add)                                                         \
+do {                                                                                           \
+ LDECLTYPE(head) _tmp;                                                                         \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ (add)->next = (el);                                                                           \
+ if ((head) == (el)) {                                                                         \
+  (head) = (add);                                                                              \
+ } else {                                                                                      \
+  _tmp = head;                                                                                 \
+  while (_tmp->next && (_tmp->next != (el))) {                                                 \
+   _tmp = _tmp->next;                                                                          \
+  }                                                                                            \
+  if (_tmp->next) {                                                                            \
+    _tmp->next = (add);                                                                        \
+  }                                                                                            \
+ }                                                                                             \
+} while (0)                                                                                    \
+
+
+/******************************************************************************
+ * doubly linked list macros (non-circular)                                   *
+ *****************************************************************************/
+#define DL_PREPEND(head,add)                                                                   \
+    DL_PREPEND2(head,add,prev,next)
+
+#define DL_PREPEND2(head,add,prev,next)                                                        \
+do {                                                                                           \
+ (add)->next = head;                                                                           \
+ if (head) {                                                                                   \
+   (add)->prev = (head)->prev;                                                                 \
+   (head)->prev = (add);                                                                       \
+ } else {                                                                                      \
+   (add)->prev = (add);                                                                        \
+ }                                                                                             \
+ (head) = (add);                                                                               \
+} while (0)
+
+#define DL_APPEND(head,add)                                                                    \
+    DL_APPEND2(head,add,prev,next)
+
+#define DL_APPEND2(head,add,prev,next)                                                         \
+do {                                                                                           \
+  if (head) {                                                                                  \
+      (add)->prev = (head)->prev;                                                              \
+      (head)->prev->next = (add);                                                              \
+      (head)->prev = (add);                                                                    \
+      (add)->next = NULL;                                                                      \
+  } else {                                                                                     \
+      (head)=(add);                                                                            \
+      (head)->prev = (head);                                                                   \
+      (head)->next = NULL;                                                                     \
+  }                                                                                            \
+} while (0) 
+
+#define DL_CONCAT(head1,head2)                                                                 \
+    DL_CONCAT2(head1,head2,prev,next)
+
+#define DL_CONCAT2(head1,head2,prev,next)                                                      \
+do {                                                                                           \
+  LDECLTYPE(head1) _tmp;                                                                       \
+  if (head2) {                                                                                 \
+    if (head1) {                                                                               \
+        _tmp = (head2)->prev;                                                                  \
+        (head2)->prev = (head1)->prev;                                                         \
+        (head1)->prev->next = (head2);                                                         \
+        (head1)->prev = _tmp;                                                                  \
+    } else {                                                                                   \
+        (head1)=(head2);                                                                       \
+    }                                                                                          \
+  }                                                                                            \
+} while (0) 
+
+#define DL_DELETE(head,del)                                                                    \
+    DL_DELETE2(head,del,prev,next)
+
+#define DL_DELETE2(head,del,prev,next)                                                         \
+do {                                                                                           \
+  assert((del)->prev != NULL);                                                                 \
+  if ((del)->prev == (del)) {                                                                  \
+      (head)=NULL;                                                                             \
+  } else if ((del)==(head)) {                                                                  \
+      (del)->next->prev = (del)->prev;                                                         \
+      (head) = (del)->next;                                                                    \
+  } else {                                                                                     \
+      (del)->prev->next = (del)->next;                                                         \
+      if ((del)->next) {                                                                       \
+          (del)->next->prev = (del)->prev;                                                     \
+      } else {                                                                                 \
+          (head)->prev = (del)->prev;                                                          \
+      }                                                                                        \
+  }                                                                                            \
+} while (0) 
+
+
+#define DL_FOREACH(head,el)                                                                    \
+    DL_FOREACH2(head,el,next)
+
+#define DL_FOREACH2(head,el,next)                                                              \
+    for(el=head;el;el=(el)->next)
+
+/* this version is safe for deleting the elements during iteration */
+#define DL_FOREACH_SAFE(head,el,tmp)                                                           \
+    DL_FOREACH_SAFE2(head,el,tmp,next)
+
+#define DL_FOREACH_SAFE2(head,el,tmp,next)                                                     \
+  for((el)=(head);(el) && (tmp = (el)->next, 1); (el) = tmp)
+
+/* these are identical to their singly-linked list counterparts */
+#define DL_SEARCH_SCALAR LL_SEARCH_SCALAR
+#define DL_SEARCH LL_SEARCH
+#define DL_SEARCH_SCALAR2 LL_SEARCH_SCALAR2
+#define DL_SEARCH2 LL_SEARCH2
+
+#define DL_REPLACE_ELEM(head, el, add)                                                         \
+do {                                                                                           \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ if ((head) == (el)) {                                                                         \
+  (head) = (add);                                                                              \
+  (add)->next = (el)->next;                                                                    \
+  if ((el)->next == NULL) {                                                                    \
+   (add)->prev = (add);                                                                        \
+  } else {                                                                                     \
+   (add)->prev = (el)->prev;                                                                   \
+   (add)->next->prev = (add);                                                                  \
+  }                                                                                            \
+ } else {                                                                                      \
+  (add)->next = (el)->next;                                                                    \
+  (add)->prev = (el)->prev;                                                                    \
+  (add)->prev->next = (add);                                                                   \
+  if ((el)->next == NULL) {                                                                    \
+   (head)->prev = (add);                                                                       \
+  } else {                                                                                     \
+   (add)->next->prev = (add);                                                                  \
+  }                                                                                            \
+ }                                                                                             \
+} while (0)
+
+#define DL_PREPEND_ELEM(head, el, add)                                                         \
+do {                                                                                           \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ (add)->next = (el);                                                                           \
+ (add)->prev = (el)->prev;                                                                     \
+ (el)->prev = (add);                                                                           \
+ if ((head) == (el)) {                                                                         \
+  (head) = (add);                                                                              \
+ } else {                                                                                      \
+  (add)->prev->next = (add);                                                                   \
+ }                                                                                             \
+} while (0)                                                                                    \
+
+
+/******************************************************************************
+ * circular doubly linked list macros                                         *
+ *****************************************************************************/
+#define CDL_PREPEND(head,add)                                                                  \
+    CDL_PREPEND2(head,add,prev,next)
+
+#define CDL_PREPEND2(head,add,prev,next)                                                       \
+do {                                                                                           \
+ if (head) {                                                                                   \
+   (add)->prev = (head)->prev;                                                                 \
+   (add)->next = (head);                                                                       \
+   (head)->prev = (add);                                                                       \
+   (add)->prev->next = (add);                                                                  \
+ } else {                                                                                      \
+   (add)->prev = (add);                                                                        \
+   (add)->next = (add);                                                                        \
+ }                                                                                             \
+(head)=(add);                                                                                  \
+} while (0)
+
+#define CDL_DELETE(head,del)                                                                   \
+    CDL_DELETE2(head,del,prev,next)
+
+#define CDL_DELETE2(head,del,prev,next)                                                        \
+do {                                                                                           \
+  if ( ((head)==(del)) && ((head)->next == (head))) {                                          \
+      (head) = 0L;                                                                             \
+  } else {                                                                                     \
+     (del)->next->prev = (del)->prev;                                                          \
+     (del)->prev->next = (del)->next;                                                          \
+     if ((del) == (head)) (head)=(del)->next;                                                  \
+  }                                                                                            \
+} while (0) 
+
+#define CDL_FOREACH(head,el)                                                                   \
+    CDL_FOREACH2(head,el,next)
+
+#define CDL_FOREACH2(head,el,next)                                                             \
+    for(el=head;el;el=((el)->next==head ? 0L : (el)->next)) 
+
+#define CDL_FOREACH_SAFE(head,el,tmp1,tmp2)                                                    \
+    CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next)
+
+#define CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next)                                         \
+  for((el)=(head), ((tmp1)=(head)?((head)->prev):NULL);                                        \
+      (el) && ((tmp2)=(el)->next, 1);                                                          \
+      ((el) = (((el)==(tmp1)) ? 0L : (tmp2))))
+
+#define CDL_SEARCH_SCALAR(head,out,field,val)                                                  \
+    CDL_SEARCH_SCALAR2(head,out,field,val,next)
+
+#define CDL_SEARCH_SCALAR2(head,out,field,val,next)                                            \
+do {                                                                                           \
+    CDL_FOREACH2(head,out,next) {                                                              \
+      if ((out)->field == (val)) break;                                                        \
+    }                                                                                          \
+} while(0) 
+
+#define CDL_SEARCH(head,out,elt,cmp)                                                           \
+    CDL_SEARCH2(head,out,elt,cmp,next)
+
+#define CDL_SEARCH2(head,out,elt,cmp,next)                                                     \
+do {                                                                                           \
+    CDL_FOREACH2(head,out,next) {                                                              \
+      if ((cmp(out,elt))==0) break;                                                            \
+    }                                                                                          \
+} while(0) 
+
+#define CDL_REPLACE_ELEM(head, el, add)                                                        \
+do {                                                                                           \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ if ((el)->next == (el)) {                                                                     \
+  (add)->next = (add);                                                                         \
+  (add)->prev = (add);                                                                         \
+  (head) = (add);                                                                              \
+ } else {                                                                                      \
+  (add)->next = (el)->next;                                                                    \
+  (add)->prev = (el)->prev;                                                                    \
+  (add)->next->prev = (add);                                                                   \
+  (add)->prev->next = (add);                                                                   \
+  if ((head) == (el)) {                                                                        \
+   (head) = (add);                                                                             \
+  }                                                                                            \
+ }                                                                                             \
+} while (0)
+
+#define CDL_PREPEND_ELEM(head, el, add)                                                        \
+do {                                                                                           \
+ assert(head != NULL);                                                                         \
+ assert(el != NULL);                                                                           \
+ assert(add != NULL);                                                                          \
+ (add)->next = (el);                                                                           \
+ (add)->prev = (el)->prev;                                                                     \
+ (el)->prev = (add);                                                                           \
+ (add)->prev->next = (add);                                                                    \
+ if ((head) == (el)) {                                                                         \
+  (head) = (add);                                                                              \
+ }                                                                                             \
+} while (0)                                                                                    \
+
+#endif /* UTLIST_H */
+