LP#1268619: Apache websocket translator module
authorBill Erickson <berick@esilibrary.com>
Fri, 26 Oct 2012 19:06:13 +0000 (15:06 -0400)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:34:59 +0000 (15:34 -0700)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>

src/gateway/Makefile.am
src/gateway/osrf_websocket_translator.c [new file with mode: 0644]

index 425a824..27e9cc1 100644 (file)
@@ -15,7 +15,9 @@ if APACHE_MIN_24
 HAVE_APACHE_MIN_24 = -DAPACHE_MIN_24
 endif
 
-EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c
+EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \
+       @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c  \
+       @srcdir@/osrf_websocket_translator.c
 
 AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
 AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf
@@ -31,9 +33,11 @@ install-exec-local:
        fi
        $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_json_gateway.c apachetools.c apachetools.h libopensrf.so
        $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_http_translator.c apachetools.c apachetools.h libopensrf.so
+       $(APXS2) -c $(DEF_LDLIBS) $(AM_CFLAGS) $(AM_LDFLAGS) @srcdir@/osrf_websocket_translator.c apachetools.c apachetools.h libopensrf.so
        $(MKDIR_P) $(DESTDIR)$(AP_LIBEXECDIR)
        $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) -a @srcdir@/osrf_json_gateway.la
        $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) -a @srcdir@/osrf_http_translator.la
+       $(APXS2) -i -S LIBEXECDIR=$(DESTDIR)$(AP_LIBEXECDIR) -a @srcdir@/osrf_websocket_translator.la
 
 clean-local:
-       rm -f @srcdir@/osrf_http_translator.la @srcdir@/osrf_http_translator.lo @srcdir@/osrf_http_translator.slo @srcdir@/osrf_json_gateway.la @srcdir@/osrf_json_gateway.lo @srcdir@/osrf_json_gateway.slo
+       rm -f @srcdir@/osrf_http_translator.la @srcdir@/osrf_http_translator.lo @srcdir@/osrf_http_translator.slo @srcdir@/osrf_json_gateway.la @srcdir@/osrf_json_gateway.lo @srcdir@/osrf_json_gateway.slo  @srcdir@/osrf_websocket_translator.la @srcdir@/osrf_websocket_translator.lo @srcdir@/osrf_websocket_translator.slo
diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c
new file mode 100644 (file)
index 0000000..fd19f2d
--- /dev/null
@@ -0,0 +1,427 @@
+/* -----------------------------------------------------------------------
+ * Copyright 2012 Equinox Software, Inc.
+ * Bill Erickson <berick@esilibrary.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * -----------------------------------------------------------------------
+ */
+
+/**
+ * Dumb websocket <-> opensrf gateway.  Wrapped opensrf messages are extracted
+ * and relayed to the opensrf network.  Responses are pulled from the opensrf
+ * network and passed back to the client.  No attempt is made to understand
+ * the contents of the messages.
+ *
+ * Messages to/from the websocket client take the following form:
+ * {
+ *   "service"  : "opensrf.foo", // required for new sessions (inbound only)
+ *   "thread"   : "123454321",   // AKA thread. required for follow-up requests; max 64 chars.
+ *   "log_xid"  : "123..32",     // optional log trace ID, max 64 chars;
+ *   "osrf_msg" : {<osrf_msg>}   // required
+ * }
+ *
+ * Each translator operates with two threads.  One thread receives messages
+ * from the websocket client, translates, and relays them to the opensrf 
+ * network. The second thread collects responses from the opensrf network and 
+ * relays them back to the websocket client.
+ *
+ * The main thread reads from socket A (apache) and writes to socket B 
+ * (openesrf), while the responder thread reads from B and writes to A.  The 
+ * apr data structures used are threadsafe.  For now, no thread mutex's are 
+ * used.
+ *
+ * Note that with a "thread", which allows us to identify the opensrf session,
+ * the caller does not need to provide a recipient address.  The "service" is
+ * only required to start a new opensrf session.  After the sesession is 
+ * started, all future communication is based solely on the thread.  
+ *
+ * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care 
+ * about the contents of the messages.
+ */
+
+/**
+ * TODO:
+ * short-timeout mode for brick detachment where inactivity timeout drops way 
+ * down for graceful disconnects.
+ */
+
+#include "httpd.h"
+#include "http_log.h"
+#include "http_log.h"
+#include "apr_strings.h"
+#include "apr_thread_proc.h"
+#include "apr_hash.h"
+#include "websocket_plugin.h"
+#include "opensrf/osrf_json.h"
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+#include "opensrf/osrf_system.h"                                                
+#include "opensrf/osrfConfig.h"
+
+#define MAX_THREAD_SIZE 64
+#define RECIP_BUF_SIZE 128
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+static transport_client *osrf_handle = NULL;
+
+typedef struct _osrfWebsocketTranslator {
+    const WebSocketServer *server;
+    apr_pool_t *main_pool; // standline per-process pool
+    apr_pool_t *session_pool; // child of trans->main_pool; per-session
+    apr_hash_t *session_cache; 
+    apr_thread_t *responder_thread;
+    int client_connected;
+    char* osrf_router;
+    char* osrf_domain;
+} osrfWebsocketTranslator;
+
+static osrfWebsocketTranslator *trans = NULL;
+
+
+/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the 
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+
+    request_rec *r = trans->server->request(trans->server);
+    jsonObject *msg_wrapper;
+    char *msg_string;
+
+    while (1) {
+
+        transport_message *msg = client_recv(osrf_handle, -1);
+        if (!msg) continue; // early exit on interrupt
+        
+        // discard responses received after client disconnect
+        if (!trans->client_connected) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                "WS discarding response for thread=%s, xid=%s", 
+                    msg->thread, msg->osrf_xid);
+            message_free(msg);                                                         
+            continue; 
+        }
+
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+            "WS received opensrf response for thread=%s, xid=%s", 
+                msg->thread, msg->osrf_xid);
+
+        // build the wrapper object
+        msg_wrapper = jsonNewObject(NULL);
+        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
+        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
+        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
+
+        if (msg->is_error) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+                "WS received jabber error message in response to thread=%s and xid=%s", 
+                    msg->thread, msg->osrf_xid);
+            jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+        }
+
+        msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+        // deliver the wrapped message json to the websocket client
+        trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
+            (unsigned char*) msg_string, strlen(msg_string));
+
+        // capture the true message sender
+        // TODO: this will grow to add one entry per client session.  
+        // need a last-touched timeout mechanism to periodically remove old entries
+        if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
+
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
+
+            apr_hash_set(trans->session_cache, 
+                apr_pstrdup(trans->session_pool, msg->thread),
+                APR_HASH_KEY_STRING, 
+                apr_pstrdup(trans->session_pool, msg->sender));
+        }
+
+        free(msg_string);
+        jsonObjectFree(msg_wrapper);
+        message_free(msg);                                                         
+    }
+
+    return NULL;
+}
+
+/**
+ * Allocate the session cache and create the responder thread
+ */
+int child_init(const WebSocketServer *server) {
+
+    apr_pool_t *pool = NULL;                                                
+    apr_thread_t *thread = NULL;
+    apr_threadattr_t *thread_attr = NULL;
+    request_rec *r = server->request(server);
+        
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init");
+
+    // osrf_handle will already be connected if this is not the first request
+    // served by this process.
+    if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
+        char* config_file = "/openils/conf/opensrf_core.xml";
+        char* config_ctx = "gateway"; //TODO config
+        if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,                              
+                "WS unable to bootstrap OpenSRF client with config %s", config_file); 
+            return 1;
+        }
+
+        osrf_handle = osrfSystemGetTransportClient();
+    }
+
+    // create a standalone pool for our translator data
+    if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool");
+        return 1;
+    }
+
+
+    // allocate our static translator instance
+    trans = (osrfWebsocketTranslator*) 
+        apr_palloc(pool, sizeof(osrfWebsocketTranslator));
+
+    if (trans == NULL) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator");
+        return 1;
+    }
+
+    trans->main_pool = pool;
+    trans->server = server;
+    trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
+    trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
+
+    // Create the responder thread.  Once created, it runs for the lifetime
+    // of this process.
+    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+         (apr_thread_create(&thread, thread_attr, 
+                osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+        trans->responder_thread = thread;
+        
+    } else {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+            "WS unable to create responder thread");
+        return 1;
+    }
+
+    return APR_SUCCESS;
+}
+
+/**
+ * Create the per-client translator
+ */
+void* CALLBACK on_connect_handler(const WebSocketServer *server) {
+    request_rec *r = server->request(server);
+    apr_pool_t *pool;
+
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+        "WS connect from %s", r->connection->remote_ip);
+
+    if (!trans) {
+        if (child_init(server) != APR_SUCCESS) {
+            return NULL;
+        }
+    }
+
+    // create a standalone pool for the session cache values, which will be
+    // destroyed on client disconnect.
+    if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+            "WS Unable to create apr_pool");
+        return NULL;
+    }
+
+    trans->session_pool = pool;
+    trans->session_cache = apr_hash_make(trans->session_pool);
+
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+        "WS created new pool %x", trans->session_pool);
+
+    if (trans->session_cache == NULL) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+            "WS unable to create session cache");
+        return NULL;
+    }
+
+    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+        "WS created new hash %x", trans->session_cache);
+
+    trans->client_connected = 1;
+    return trans;
+}
+
+
+
+/**
+ * Parse opensrf request and relay the request to the opensrf network.
+ */
+static size_t CALLBACK on_message_handler(void *data,
+                const WebSocketServer *server, const int type, 
+                unsigned char *buffer, const size_t buffer_size) {
+
+    request_rec *r = server->request(server);
+
+    jsonObject *msg_wrapper = NULL; // free me
+    const jsonObject *tmp_obj = NULL;
+    const jsonObject *osrf_msg = NULL;
+    const char *service = NULL;
+    const char *thread = NULL;
+    const char *log_xid = NULL;
+    char *msg_body = NULL;
+    char *recipient = NULL;
+
+    if (buffer_size <= 0) return OK;
+
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+        "WS received message size=%d", buffer_size);
+
+    // buffer may not be \0-terminated, which jsonParse requires
+    char buf[buffer_size + 1];
+    memcpy(buf, buffer, buffer_size);
+    buf[buffer_size] = '\0';
+
+    msg_wrapper = jsonParseRaw(buf);
+
+    if (msg_wrapper == NULL) {
+        ap_log_rerror(APLOG_MARK, 
+            APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf);
+        return HTTP_BAD_REQUEST;
+    }
+
+    osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
+
+    if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
+        service = jsonObjectGetString(tmp_obj);
+
+    if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) 
+        thread = jsonObjectGetString(tmp_obj);
+
+    if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) 
+        log_xid = jsonObjectGetString(tmp_obj);
+
+    if (log_xid) {
+        // use the caller-provide log trace id
+        if (strlen(log_xid) > MAX_THREAD_SIZE) {
+            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
+                0, r, "WS log_xid exceeds max length");
+            return HTTP_BAD_REQUEST;
+        }
+        osrfLogSetXid(log_xid); // TODO: make with with non-client
+    } else {
+        // generate a new log trace id for this relay
+        osrfLogMkXid();
+    }
+
+    if (thread) {
+
+        if (strlen(thread) > MAX_THREAD_SIZE) {
+            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
+                0, r, "WS thread exceeds max length");
+            return HTTP_BAD_REQUEST;
+        }
+
+        // since clients can provide their own threads at session start time,
+        // the presence of a thread does not guarantee a cached recipient
+        recipient = (char*) apr_hash_get(
+            trans->session_cache, thread, APR_HASH_KEY_STRING);
+
+        if (recipient) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                "WS found cached recipient %s", recipient);
+        }
+    }
+
+    if (!recipient) {
+
+        if (service) {
+            int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
+                "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);                                    
+            recipient_buf[size] = '\0';                                          
+            recipient = recipient_buf;
+
+        } else {
+            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
+                0, r, "WS Unable to determine recipient");
+            return HTTP_BAD_REQUEST;
+        }
+    }
+
+    // TODO: activity log entry? -- requires message analysis
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+        "WS relaying message thread=%s, xid=%s, recipient=%s", 
+            thread, osrfLogGetXid(), recipient);
+
+    msg_body = jsonObjectToJSONRaw(osrf_msg);
+
+    transport_message *tmsg = message_init(
+        msg_body, NULL, thread, recipient, NULL);
+
+    message_set_osrf_xid(tmsg, osrfLogGetXid());                                
+    client_send_message(osrf_handle, tmsg);                                   
+    osrfLogClearXid();
+
+    message_free(tmsg);                                                         
+    free(msg_wrapper);
+    free(msg_body);
+
+    return OK;
+}
+
+
+/**
+ * Release all memory allocated from the translator pool and kill the pool.
+ */
+void CALLBACK on_disconnect_handler(
+    void *data, const WebSocketServer *server) {
+
+    osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
+    trans->client_connected = 0;
+
+    apr_hash_clear(trans->session_cache);
+    apr_pool_destroy(trans->session_pool);
+    trans->session_pool = NULL;
+    trans->session_cache = NULL;
+
+    request_rec *r = server->request(server);
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+        "WS disconnect from %s", r->connection->remote_ip);
+}
+
+void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
+    fprintf(stderr, "WS on_destroy_handler()\n");
+    fflush(stderr);
+
+    if (trans) {
+        apr_thread_exit(trans->responder_thread, APR_SUCCESS);
+        apr_pool_destroy(trans->main_pool);
+    }
+
+    trans = NULL;
+}
+
+static WebSocketPlugin osrf_websocket_plugin = {
+    sizeof(WebSocketPlugin),
+    WEBSOCKET_PLUGIN_VERSION_0,
+    on_destroy_handler,
+    on_connect_handler,
+    on_message_handler,
+    on_disconnect_handler
+};
+
+extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
+    return &osrf_websocket_plugin;
+}
+