modules/websocket: more work on module boiler-plate and handshake
authorPeter Dunkley <peter.dunkley@crocodile-rcs.com>
Sat, 16 Jun 2012 13:38:22 +0000 (14:38 +0100)
committerPeter Dunkley <peter.dunkley@crocodile-rcs.com>
Sat, 16 Jun 2012 13:38:22 +0000 (14:38 +0100)
- Handshake now works with Google Chrome

modules/websocket/Makefile
modules/websocket/example/kamailio.cfg
modules/websocket/ws_frame.c [new file with mode: 0644]
modules/websocket/ws_frame.h [new file with mode: 0644]
modules/websocket/ws_handshake.c
modules/websocket/ws_handshake.h
modules/websocket/ws_mod.c
modules/websocket/ws_mod.h

index 36543a0..dade653 100644 (file)
@@ -22,6 +22,7 @@ DEFS+=-DOPENSER_MOD_INTERFACE
 
 SERLIBPATH=../../lib
 SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+SER_LIBS+=$(SERLIBPATH)/kmi/kmi
 
 include ../../Makefile.modules
 
index 4b772b1..ba20abb 100644 (file)
@@ -40,6 +40,7 @@ loadmodule "auth_db.so"
 loadmodule "xhttp.so"
 loadmodule "kex.so"
 loadmodule "websocket.so"
+loadmodule "mi_rpc.so"
 
 # ----------------- setting module-specific parameters ---------------
 
diff --git a/modules/websocket/ws_frame.c b/modules/websocket/ws_frame.c
new file mode 100644 (file)
index 0000000..50c8d68
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2012 Crocodile RCS Ltd
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#include "../../lib/kmi/tree.h"
+#include "ws_frame.h"
+#include "ws_mod.h"
+
+struct mi_root *ws_mi_close(struct mi_root *cmd, void *param)
+{
+       /* TODO close specified or all connections */
+       return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
+
+struct mi_root *ws_mi_ping(struct mi_root *cmd, void *param)
+{
+       /* TODO ping specified connection */
+       return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
diff --git a/modules/websocket/ws_frame.h b/modules/websocket/ws_frame.h
new file mode 100644 (file)
index 0000000..c777a15
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2012 Crocodile RCS Ltd
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+#ifndef _WS_FRAME_H
+#define _WS_FRAME_H
+
+#include "../../sr_module.h"
+#include "../../lib/kmi/tree.h"
+
+struct mi_root *ws_mi_close(struct mi_root *cmd, void *param);
+struct mi_root *ws_mi_ping(struct mi_root *cmd, void *param);
+
+#endif /* _WS_FRAME_H */
index b46094f..721b2aa 100644 (file)
 #include "../../basex.h"
 #include "../../data_lump_rpl.h"
 #include "../../dprint.h"
+#include "../../locking.h"
+#include "../../lib/kcore/kstats_wrapper.h"
 #include "../../lib/kcore/cmpapi.h"
+#include "../../lib/kmi/tree.h"
 #include "../../parser/msg_parser.h"
 #include "../sl/sl.h"
 #include "ws_handshake.h"
 #define WS_VERSION             (13)
 
 static str str_sip = str_init("sip");
+static str str_upgrade = str_init("upgrade");
 static str str_websocket = str_init("websocket");
 static str str_ws_guid = str_init("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
 
 /* HTTP headers */
-static str str_connection = str_init("Connection");
-static str str_upgrade = str_init("Upgrade");
-static str str_sec_websocket_accept = str_init("Sec-WebSocket-Accept");
-static str str_sec_websocket_key = str_init("Sec-WebSocket-Key");
-static str str_sec_websocket_protocol = str_init("Sec-WebSocket-Protocol");
-static str str_sec_websocket_version = str_init("Sec-WebSocket-Init");
+static str str_hdr_connection = str_init("Connection");
+static str str_hdr_upgrade = str_init("Upgrade");
+static str str_hdr_sec_websocket_accept = str_init("Sec-WebSocket-Accept");
+static str str_hdr_sec_websocket_key = str_init("Sec-WebSocket-Key");
+static str str_hdr_sec_websocket_protocol = str_init("Sec-WebSocket-Protocol");
+static str str_hdr_sec_websocket_version = str_init("Sec-WebSocket-Version");
 #define CONNECTION             (1<<0)
 #define UPGRADE                        (1<<1)
 #define SEC_WEBSOCKET_ACCEPT   (1<<2)
@@ -56,11 +60,12 @@ static str str_sec_websocket_version = str_init("Sec-WebSocket-Init");
                                        | SEC_WEBSOCKET_PROTOCOL\
                                        | SEC_WEBSOCKET_VERSION)
 
-/* HTTP response text */
-static str str_switching_protocols = str_init("Switching Protocols");
-static str str_bad_request = str_init("Bad Request");
-static str str_upgrade_required = str_init("Upgrade Required");
-static str str_internal_server_error = str_init("Internal Server Error");
+/* HTTP status text */
+static str str_status_switching_protocols = str_init("Switching Protocols");
+static str str_status_bad_request = str_init("Bad Request");
+static str str_status_upgrade_required = str_init("Upgrade Required");
+static str str_status_internal_server_error = str_init("Internal Server Error");
+static str str_status_service_unavailable = str_init("Service Unavailable");
 
 #define HDR_BUF_LEN            (256)
 static char headers_buf[HDR_BUF_LEN];
@@ -70,11 +75,14 @@ static char key_buf[KEY_BUF_LEN];
 
 static int ws_send_reply(sip_msg_t *msg, int code, str *reason, str *hdrs)
 {
+       int cur_cons, max_cons;
+
        if (hdrs && hdrs->len > 0)
        {
                if (add_lump_rpl(msg, hdrs->s, hdrs->len, LUMP_RPL_HDR) == 0)
                {
                        LM_ERR("inserting extra-headers lump\n");
+                       update_stat(ws_failed_handshakes, 1);
                        return -1;
                }
        }
@@ -82,9 +90,28 @@ static int ws_send_reply(sip_msg_t *msg, int code, str *reason, str *hdrs)
        if (ws_slb.freply(msg, code, reason) < 0)
        {
                LM_ERR("sending reply\n");
+               update_stat(ws_failed_handshakes, 1);
                return -1;
        }
 
+       if (code == 101)
+       {
+               update_stat(ws_successful_handshakes, 1);
+
+               lock_get(ws_stats_lock);
+               update_stat(ws_current_connections, 1);
+
+               cur_cons = get_stat_val(ws_current_connections);
+               max_cons = get_stat_val(ws_max_concurrent_connections);
+
+               if (max_cons < cur_cons)
+                       update_stat(ws_max_concurrent_connections,
+                                               cur_cons - max_cons);
+               lock_release(ws_stats_lock);
+       }
+       else
+               update_stat(ws_failed_handshakes, 1);
+
        return 0;
 }
 
@@ -96,59 +123,88 @@ int ws_handle_handshake(struct sip_msg *msg)
        int version;
        struct hdr_field *hdr = msg->headers;
 
+       if (*ws_enabled == 0)
+       {
+               LM_INFO("disabled: bouncing handshake\n");
+               ws_send_reply(msg, 503, &str_status_service_unavailable, NULL);
+               return 0;
+       }
+
        while (hdr != NULL)
        {
                /* Decode and validate Connection */
                if (cmp_hdrname_strzn(&hdr->name,
-                               str_connection.s,
-                               str_connection.len) == 0)
+                               str_hdr_connection.s,
+                               str_hdr_connection.len) == 0)
                {
-                       /* TODO: validate Connection body */
-                       hdr_flags |= CONNECTION;
+                       strlower(&hdr->body);
+                       if (str_search(&hdr->body, &str_upgrade) != NULL)
+                       {
+                               LM_INFO("found %.*s: %.*s\n",
+                                       hdr->name.len, hdr->name.s,
+                                       hdr->body.len, hdr->body.s);
+                               hdr_flags |= CONNECTION;
+                       }
                }
                /* Decode and validate Upgrade */
                else if (cmp_hdrname_strzn(&hdr->name,
-                               str_upgrade.s,
-                               str_upgrade.len) == 0)
+                               str_hdr_upgrade.s,
+                               str_hdr_upgrade.len) == 0)
                {
-                       /* TODO: validate Upgrade body */
-                       hdr_flags |= UPGRADE;
+                       strlower(&hdr->body);
+                       if (str_search(&hdr->body, &str_websocket) != NULL)
+                       {
+                               LM_INFO("found %.*s: %.*s\n",
+                                       hdr->name.len, hdr->name.s,
+                                       hdr->body.len, hdr->body.s);
+                               hdr_flags |= UPGRADE;
+                       }
                }
                /* Decode and validate Sec-WebSocket-Key */
                else if (cmp_hdrname_strzn(&hdr->name,
-                               str_sec_websocket_key.s, 
-                               str_sec_websocket_key.len) == 0) 
+                               str_hdr_sec_websocket_key.s, 
+                               str_hdr_sec_websocket_key.len) == 0) 
                {
                        if (hdr_flags & SEC_WEBSOCKET_KEY)
                        {
                                LM_WARN("%.*s found multiple times\n",
                                        hdr->name.len, hdr->name.s);
-                               ws_send_reply(msg, 400, &str_bad_request, NULL);
+                               ws_send_reply(msg, 400,
+                                               &str_status_bad_request, NULL);
                                return 0;
                        }
 
+                       LM_INFO("found %.*s: %.*s\n",
+                               hdr->name.len, hdr->name.s,
+                               hdr->body.len, hdr->body.s);
                        key = hdr->body;
                        hdr_flags |= SEC_WEBSOCKET_KEY;
                }
                /* Decode and validate Sec-WebSocket-Protocol */
                else if (cmp_hdrname_strzn(&hdr->name,
-                               str_sec_websocket_protocol.s,
-                               str_sec_websocket_protocol.len) == 0)
+                               str_hdr_sec_websocket_protocol.s,
+                               str_hdr_sec_websocket_protocol.len) == 0)
                {
-                       /* TODO: better validation of sip... */
+                       strlower(&hdr->body);
                        if (str_search(&hdr->body, &str_sip) != NULL)
+                       {
+                               LM_INFO("found %.*s: %.*s\n",
+                                       hdr->name.len, hdr->name.s,
+                                       hdr->body.len, hdr->body.s);
                                hdr_flags |= SEC_WEBSOCKET_PROTOCOL;
+                       }
                }
                /* Decode and validate Sec-WebSocket-Version */
                else if (cmp_hdrname_strzn(&hdr->name,
-                               str_sec_websocket_version.s,
-                               str_sec_websocket_version.len) == 0)
+                               str_hdr_sec_websocket_version.s,
+                               str_hdr_sec_websocket_version.len) == 0)
                {
                        if (hdr_flags & SEC_WEBSOCKET_VERSION)
                        {
                                LM_WARN("%.*s found multiple times\n",
                                        hdr->name.len, hdr->name.s);
-                               ws_send_reply(msg, 400, &str_bad_request, NULL);
+                               ws_send_reply(msg, 400,
+                                               &str_status_bad_request, NULL);
                                return 0;
                        }
 
@@ -161,14 +217,18 @@ int ws_handle_handshake(struct sip_msg *msg)
                                headers.s = headers_buf;
                                headers.len = snprintf(headers.s, HDR_BUF_LEN,
                                        "%.*s: %d\r\n",
-                                       str_sec_websocket_version.len,
-                                       str_sec_websocket_version.s,
+                                       str_hdr_sec_websocket_version.len,
+                                       str_hdr_sec_websocket_version.s,
                                        WS_VERSION);
-                               ws_send_reply(msg, 426, &str_upgrade_required,
+                               ws_send_reply(msg, 426,
+                                               &str_status_upgrade_required,
                                                &headers);
                                return 0;
                        }
 
+                       LM_INFO("found %.*s: %.*s\n",
+                               hdr->name.len, hdr->name.s,
+                               hdr->body.len, hdr->body.s);
                        hdr_flags |= SEC_WEBSOCKET_VERSION;
                }
 
@@ -183,9 +243,13 @@ int ws_handle_handshake(struct sip_msg *msg)
                headers.len = snprintf(headers.s, HDR_BUF_LEN,
                                        "%.*s: %.*s\r\n"
                                        "%.*s: %d\r\n",
-                                       str_sec_websocket_protocol.len, str_sec_websocket_protocol.s, str_sip.len, str_sip.s,
-                                       str_sec_websocket_version.len, str_sec_websocket_version.s, WS_VERSION);
-               ws_send_reply(msg, 400, &str_bad_request, NULL);
+                                       str_hdr_sec_websocket_protocol.len,
+                                       str_hdr_sec_websocket_protocol.s,
+                                       str_sip.len, str_sip.s,
+                                       str_hdr_sec_websocket_version.len,
+                                       str_hdr_sec_websocket_version.s,
+                                       WS_VERSION);
+               ws_send_reply(msg, 400, &str_status_bad_request, NULL);
                return 0;
        }
 
@@ -195,7 +259,8 @@ int ws_handle_handshake(struct sip_msg *msg)
        if (reply_key.s == NULL)
        {
                LM_ERR("allocating pkg memory\n");
-               ws_send_reply(msg, 500, &str_internal_server_error, NULL);
+               ws_send_reply(msg, 500, &str_status_internal_server_error,
+                               NULL);
                return 0;
        }
        memcpy(reply_key.s, key.s, key.len);
@@ -214,16 +279,35 @@ int ws_handle_handshake(struct sip_msg *msg)
                        "%.*s: %.*s\r\n"
                        "%.*s: %.*s\r\n"
                        "%.*s: %.*s\r\n",
-                       str_upgrade.len, str_upgrade.s, str_websocket.len, str_websocket.s,
-                       str_connection.len, str_connection.s, str_upgrade.len, str_upgrade.s,
-                       str_sec_websocket_accept.len, str_sec_websocket_accept.s, reply_key.len, reply_key.s,
-                       str_sec_websocket_protocol.len, str_sec_websocket_protocol.s, str_sip.len, str_sip.s);
+                       str_hdr_upgrade.len, str_hdr_upgrade.s,
+                       str_websocket.len, str_websocket.s,
+                       str_hdr_connection.len, str_hdr_connection.s,
+                       str_upgrade.len, str_upgrade.s,
+                       str_hdr_sec_websocket_accept.len,
+                       str_hdr_sec_websocket_accept.s, reply_key.len,
+                       reply_key.s, str_hdr_sec_websocket_protocol.len,
+                       str_hdr_sec_websocket_protocol.s, str_sip.len,
+                       str_sip.s);
 
        /* TODO: make sure Kamailio core sends future requests on this
                 connection directly to this module */
 
        /* Send reply */
-       ws_send_reply(msg, 101, &str_switching_protocols, &headers);
+       ws_send_reply(msg, 101, &str_status_switching_protocols, &headers);
 
        return 0;
 }
+
+struct mi_root *ws_mi_disable(struct mi_root *cmd, void *param)
+{
+       *ws_enabled = 0;
+       LM_WARN("disabling websockets - new connections will be dropped\n");
+       return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
+
+struct mi_root *ws_mi_enable(struct mi_root *cmd, void *param)
+{
+       *ws_enabled = 1;
+       LM_WARN("enabling websockets\n");
+       return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
index b63e799..51480f1 100644 (file)
 #ifndef _WS_HANDSHAKE_H
 #define _WS_HANDSHAKE_H
 
+#include "../../sr_module.h"
 #include "../../parser/msg_parser.h"
 
 int ws_handle_handshake(struct sip_msg *msg);
+struct mi_root *ws_mi_disable(struct mi_root *cmd, void *param);
+struct mi_root *ws_mi_enable(struct mi_root *cmd, void *param);
 
 #endif /* _WS_HANDSHAKE_H */
index 004bed4..23d3f7a 100644 (file)
  */
 
 #include "../../dprint.h"
+#include "../../locking.h"
 #include "../../sr_module.h"
+#include "../../lib/kcore/kstats_wrapper.h"
+#include "../../lib/kmi/mi.h"
+#include "../../lib/kmi/tree.h"
 #include "../../parser/msg_parser.h"
 #include "ws_handshake.h"
+#include "ws_frame.h"
 #include "ws_mod.h"
 
 MODULE_VERSION
 
 static int mod_init(void);
+static void destroy(void);
 
 sl_api_t ws_slb;
-int ws_ping_interval = 25;     /* time (in seconds) after which a Ping will be
-                                  sent on an idle connection */
-int ws_ping_timeout = 1;       /* time (in seconds) to wait for a Pong in
-                                  response to a Ping before closing a
-                                  connection */
+int *ws_enabled;
+gen_lock_t *ws_stats_lock;
 
-static param_export_t params[]=
-{
-       {"ping_interval",       INT_PARAM, &ws_ping_interval},
-       {"ping_timeout",        INT_PARAM, &ws_ping_timeout},
-};
+int ws_ping_interval = 30;     /* time (in seconds) between sending Pings */
+
+stat_var *ws_current_connections;
+stat_var *ws_failed_connections;
+stat_var *ws_failed_handshakes;
+stat_var *ws_local_closed_connections;
+stat_var *ws_max_concurrent_connections;
+stat_var *ws_received_frames;
+stat_var *ws_remote_closed_connections;
+stat_var *ws_successful_handshakes;
+stat_var *ws_transmitted_frames;
+
+static struct mi_root *mi_dump(struct mi_root *cmd, void *param);
 
 static cmd_export_t cmds[]= 
 {
@@ -52,19 +63,49 @@ static cmd_export_t cmds[]=
     {0, 0, 0, 0, 0, 0}
 };
 
+static param_export_t params[]=
+{
+       {"ping_interval",       INT_PARAM, &ws_ping_interval},
+       {0, 0}
+};
+
+static stat_export_t stats[] =
+{
+       {"ws_current_connections",       0, &ws_current_connections },
+       {"ws_failed_connections",        0, &ws_failed_connections },
+       {"ws_failed_handshakes",         0, &ws_failed_handshakes },
+       {"ws_local_closed_connections",  0, &ws_local_closed_connections },
+       {"ws_max_concurrent_connections",0, &ws_max_concurrent_connections },
+       {"ws_received_frames",           0, &ws_received_frames },
+       {"ws_remote_closed_connections", 0, &ws_remote_closed_connections },
+       {"ws_successful_handshakes",     0, &ws_successful_handshakes },
+       {"ws_transmitted_frames",        0, &ws_transmitted_frames },
+       {0, 0, 0}
+};
+
+static mi_export_t mi_cmds[] =
+{
+       { "ws_close",   ws_mi_close,   0, 0, 0},
+       { "ws_disable", ws_mi_disable, 0, 0, 0},
+       { "ws_dump",    mi_dump,       0, 0, 0},
+       { "ws_enable",  ws_mi_enable,  0, 0, 0},
+       { "ws_ping",    ws_mi_ping,    0, 0, 0},
+       { 0, 0, 0, 0, 0}
+};
+
 struct module_exports exports= 
 {
        "websocket",
        DEFAULT_DLFLAGS,        /* dlopen flags */
        cmds,                   /* Exported functions */
        params,                 /* Exported parameters */
-       0,                      /* exported statistics */
-       0,                      /* exported MI functions */
+       stats,                  /* exported statistics */
+       mi_cmds,                /* exported MI functions */
        0,                      /* exported pseudo-variables */
        0,                      /* extra processes */
        mod_init,               /* module initialization function */
        0,                      /* response function */
-       0,                      /* destroy function */
+       destroy,                /* destroy function */
        0                       /* per-child initialization function */
 };
 
@@ -72,7 +113,38 @@ static int mod_init(void)
 {
        if (sl_load_api(&ws_slb) != 0)
        {
-               LM_ERR("cannot bind to SL\n");
+               LM_ERR("binding to SL\n");
+               return -1;
+       }
+
+       if (register_module_stats(exports.name, stats) != 0)
+       {
+               LM_ERR("registering core statistics\n");
+               return -1;
+       }
+
+       if (register_mi_mod(exports.name, mi_cmds) != 0)
+       {
+               LM_ERR("registering MI commands\n");
+               return -1;
+       }
+
+       if ((ws_enabled = (int *) shm_malloc(sizeof(int))) == NULL)
+       {
+               LM_ERR("allocating shared memory\n");
+               return -1;
+       }
+       *ws_enabled = 1;
+
+       if ((ws_stats_lock = lock_alloc()) == NULL)
+       {
+               LM_ERR("allocating lock\n");
+               return -1;
+       }
+       if (lock_init(ws_stats_lock) == NULL)
+       {
+               LM_ERR("initialising lock\n");
+               lock_dealloc(ws_stats_lock);
                return -1;
        }
 
@@ -80,3 +152,18 @@ static int mod_init(void)
 
        return 0;
 }
+
+static void destroy(void)
+{
+       shm_free(ws_enabled);
+       lock_destroy(ws_stats_lock);
+       lock_dealloc(ws_stats_lock);
+
+       /* TODO: close all connections */
+}
+
+static struct mi_root *mi_dump(struct mi_root *cmd, void *param)
+{
+       /* TODO: output all open websocket connections */
+       return init_mi_tree(200, MI_OK_S, MI_OK_LEN);
+}
index 8b5817f..c3cbd24 100644 (file)
 #ifndef _WS_MOD_H
 #define _WS_MOD_H
 
+#include "../../locking.h"
+#include "../../kstats_types.h"
 #include "../sl/sl.h"
 
 extern sl_api_t ws_slb;
-extern int ws_ping_interval;   /* time (in seconds) after which a Ping will be
-                                  sent on an idle connection */
-extern int ws_ping_timeout;    /* time (in seconds) to wait for a Pong in
-                                  response to a Ping before closing a
-                                  connection */
+extern int *ws_enabled;
+extern gen_lock_t *ws_stats_lock;
+
+extern int ws_ping_interval;   /* time (in seconds) between sending Pings */
+
+extern stat_var *ws_current_connections;
+extern stat_var *ws_failed_connections;
+extern stat_var *ws_failed_handshakes;
+extern stat_var *ws_local_closed_connections;
+extern stat_var *ws_max_concurrent_connections;
+extern stat_var *ws_received_frames;
+extern stat_var *ws_remote_closed_connections;
+extern stat_var *ws_successful_handshakes;
+extern stat_var *ws_transmitted_frames;
+
 #endif /* _WS_MOD_H */