LP#1268619: websocket translator
authorBill Erickson <berick@esilibrary.com>
Mon, 12 Nov 2012 21:46:19 +0000 (16:46 -0500)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:50:47 +0000 (15:50 -0700)
* starting packet inspection
* activity log; recipient removal
* only cache connected recipients; use request_rec pool for session_pool parent
* wrap all thread work in mutex
* session memory goodness

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>

src/gateway/osrf_websocket_translator.c

index b6205d8..35f986d 100644 (file)
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 128
+#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
 typedef struct _osrfWebsocketTranslator {
     const WebSocketServer *server;
     apr_pool_t *main_pool; // standalone per-process pool
-    apr_pool_t *session_pool; // child of trans->main_pool; per-session
+    apr_pool_t *session_pool; // child of r->pool; per-session
     apr_hash_t *session_cache; 
     apr_thread_t *responder_thread;
+    apr_thread_mutex_t *mutex;
     int client_connected;
     char* osrf_router;
     char* osrf_domain;
@@ -84,80 +86,168 @@ static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the 
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+static void clear_cached_recipient(const char* thread) {
+    apr_pool_t *pool = NULL;                                                
 
-    transport_message *tmsg;
-    jsonObject *msg_wrapper;
-    char *msg_string;
+    if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
 
-    while (1) {
+        osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
 
-        tmsg = client_recv(osrf_handle, -1);
+        // remove it from the hash
+        apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
 
-        if (!tmsg) continue; // early exit on interrupt
-        
-        // discard responses received after client disconnect
-        if (!trans->client_connected) {
-            osrfLogDebug(OSRF_LOG_MARK, 
-                "WS discarding response for thread=%s, xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-            message_free(tmsg);                                                         
-            continue; 
+        if (apr_hash_count(trans->session_cache) == 0) {
+            osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
+
+            // memory accumulates in the session_pool as sessions are cached then 
+            // un-cached.  Un-caching removes strings from the hash, but not the 
+            // pool itself.  That only happens when the pool is destroyed. destroy 
+            // the session pool to clear any lingering memory
+            apr_pool_destroy(trans->session_pool);
+    
+            // create a standalone pool for our translator data
+            if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+                osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
+                trans->session_pool = NULL;
+                return;
+            }
+
+            trans->session_pool = pool;
         }
+    }
+}
+
+
+
+void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
+    osrfList *msg_list = NULL;
+    osrfMessage *one_msg = NULL;
+    int i;
+
+    osrfLogDebug(OSRF_LOG_MARK, 
+        "WS received opensrf response for thread=%s, xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+
+    // first we need to perform some maintenance
+    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+    for (i = 0; i < msg_list->size; i++) {
+        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
 
         osrfLogDebug(OSRF_LOG_MARK, 
-            "WS received opensrf response for thread=%s, xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-
-        // build the wrapper object
-        msg_wrapper = jsonNewObject(NULL);
-        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
-        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
-        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
-
-        if (tmsg->is_error) {
-            fprintf(stderr,  
-                "WS received jabber error message in response to thread=%s and xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-            fflush(stderr);
-            jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+            "WS returned response of type %d", one_msg->m_type);
+
+        /*  if our client just successfully connected to an opensrf service,
+            cache the sender so that future calls on this thread will use
+            the correct recipient. */
+        if (one_msg && one_msg->m_type == STATUS) {
+
+
+            // only cache recipients if the client is still connected
+            if (trans->client_connected && 
+                    one_msg->status_code == OSRF_STATUS_OK) {
+
+                if (!apr_hash_get(trans->session_cache, 
+                        tmsg->thread, APR_HASH_KEY_STRING)) {
+
+                    osrfLogDebug(OSRF_LOG_MARK, 
+                        "WS caching sender thread=%s, sender=%s", 
+                        tmsg->thread, tmsg->sender);
+
+                    apr_hash_set(trans->session_cache, 
+                        apr_pstrdup(trans->session_pool, tmsg->thread),
+                        APR_HASH_KEY_STRING, 
+                        apr_pstrdup(trans->session_pool, tmsg->sender));
+                }
+
+            } else {
+
+                // connection timed out; clear the cached recipient
+                // regardless of whether the client is still connected
+                if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+                    clear_cached_recipient(tmsg->thread);
+            }
         }
+    }
+
+    // maintenance is done
+    osrfListFree(msg_list);
+
+    if (!trans->client_connected) {
+        // responses received after client disconnect are discarded
+
+        osrfLogDebug(OSRF_LOG_MARK, 
+            "WS discarding response for thread=%s, xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+
+        return;
+    }
+
+    
+    // client is still connected; relay the messages to the client
+    jsonObject *msg_wrapper = NULL;
+    char *msg_string = NULL;
+
+    // build the wrapper object
+    msg_wrapper = jsonNewObject(NULL);
+    jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+    jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+    jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+    if (tmsg->is_error) {
+        fprintf(stderr,  
+            "WS received jabber error message in response to thread=%s and xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+        fflush(stderr);
+        jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+    }
+
+    msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+    // deliver the wrapped message json to the websocket client
+    trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
+        (unsigned char*) msg_string, strlen(msg_string));
+
+    free(msg_string);
+    jsonObjectFree(msg_wrapper);
+
+}
+
+/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the 
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
 
-        msg_string = jsonObjectToJSONRaw(msg_wrapper);
+    transport_message *tmsg;
+    while (1) {
 
-        // deliver the wrapped message json to the websocket client
-        trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
-            (unsigned char*) msg_string, strlen(msg_string));
+        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+            return NULL;
+        }
 
-        // capture the true message sender
-        // TODO: this will grow to add one entry per client session.  
-        // need to ensure that connected-sessions don't last /too/ long or create 
-        // a last-touched timeout mechanism to periodically remove old  entries
-        if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
+        // wait for a response
+        tmsg = client_recv(osrf_handle, -1);
 
-            osrfLogDebug(OSRF_LOG_MARK, 
-                "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
+        if (!tmsg) continue; // early exit on interrupt
 
-            apr_hash_set(trans->session_cache, 
-                apr_pstrdup(trans->session_pool, tmsg->thread),
-                APR_HASH_KEY_STRING, 
-                apr_pstrdup(trans->session_pool, tmsg->sender));
+        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+            return NULL;
         }
 
-        free(msg_string);
-        jsonObjectFree(msg_wrapper);
+        osrf_responder_thread_main_body(tmsg);
         message_free(tmsg);                                                         
     }
 
     return NULL;
 }
 
+
+
 /**
  * Allocate the session cache and create the responder thread
  */
@@ -166,6 +256,7 @@ int child_init(const WebSocketServer *server) {
     apr_pool_t *pool = NULL;                                                
     apr_thread_t *thread = NULL;
     apr_threadattr_t *thread_attr = NULL;
+    apr_thread_mutex_t *mutex = NULL;
     request_rec *r = server->request(server);
         
     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
@@ -205,6 +296,13 @@ int child_init(const WebSocketServer *server) {
     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
+    trans->session_cache = apr_hash_make(pool);
+
+    if (trans->session_cache == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+        return 1;
+    }
+
     // Create the responder thread.  Once created, 
     // it runs for the lifetime of this process.
     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
@@ -219,6 +317,15 @@ int child_init(const WebSocketServer *server) {
         return 1;
     }
 
+    if (apr_thread_mutex_create(
+            &mutex, APR_THREAD_MUTEX_UNNESTED, 
+            trans->main_pool) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+        return 1;
+    }
+
+    trans->mutex = mutex;
+
     return APR_SUCCESS;
 }
 
@@ -239,31 +346,104 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
         }
     }
 
-    // create a standalone pool for the session cache values, which will be
-    // destroyed on client disconnect.
-    if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
+    // create a standalone pool for the session cache values
+    // this pool will be destroyed and re-created regularly to 
+    // clear session memory
+    if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
     }
 
     trans->session_pool = pool;
-    trans->session_cache = apr_hash_make(trans->session_pool);
-
-    if (trans->session_cache == NULL) {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
-        return NULL;
-    }
-
     trans->client_connected = 1;
     return trans;
 }
 
 
+/** 
+ * for each inbound opensrf message:
+ * 1. Stamp the ingress
+ * 2. REQUEST: log it as activity
+ * 3. DISCONNECT: remove the cached recipient
+ * then re-string-ify for xmpp delivery
+ */
+
+static char* extract_inbound_messages(
+        const request_rec *r, 
+        const char* service, 
+        const char* thread, 
+        const char* recipient, 
+        const jsonObject *osrf_msg) {
+
+    int i;
+    int num_msgs = osrf_msg->size;
+    osrfMessage* msg;
+    osrfMessage* msg_list[num_msgs];
+
+    // here we do an extra json round-trip to get the data
+    // in a form osrf_message_deserialize can understand
+    char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+    osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+    free(osrf_msg_json);
+
+    // should we require the caller to always pass the service?
+    if (service == NULL) service = "";
+
+    for(i = 0; i < num_msgs; i++) {
+        msg = msg_list[i];
+        osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
+
+        switch(msg->m_type) {
+
+            case REQUEST: {
+                const jsonObject* params = msg->_params;
+                growing_buffer* act = buffer_init(128);
+                char* method = msg->method_name;
+                buffer_fadd(act, "[%s] [%s] %s %s", 
+                    r->connection->remote_ip, "", service, method);
+
+                const jsonObject* obj = NULL;
+                int i = 0;
+                const char* str;
+                int redactParams = 0;
+                while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+                    if(!strncmp(method, str, strlen(str))) {
+                        redactParams = 1;
+                        break;
+                    }
+                }
+                if(redactParams) {
+                    OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+                } else {
+                    i = 0;
+                    while((obj = jsonObjectGetIndex(params, i++))) {
+                        char* str = jsonObjectToJSON(obj);
+                        if( i == 1 )
+                            OSRF_BUFFER_ADD(act, " ");
+                        else
+                            OSRF_BUFFER_ADD(act, ", ");
+                        OSRF_BUFFER_ADD(act, str);
+                        free(str);
+                    }
+                }
+                osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+                buffer_free(act);
+                break;
+            }
+
+            case DISCONNECT:
+                clear_cached_recipient(thread);
+                break;
+        }
+    }
+
+    return osrfMessageSerializeBatch(msg_list, num_msgs);
+}
 
 /**
  * Parse opensrf request and relay the request to the opensrf network.
  */
-static size_t CALLBACK on_message_handler(void *data,
+static size_t on_message_handler_body(void *data,
                 const WebSocketServer *server, const int type, 
                 unsigned char *buffer, const size_t buffer_size) {
 
@@ -277,6 +457,7 @@ static size_t CALLBACK on_message_handler(void *data,
     const char *log_xid = NULL;
     char *msg_body = NULL;
     char *recipient = NULL;
+    int i;
 
     if (buffer_size <= 0) return OK;
 
@@ -287,7 +468,7 @@ static size_t CALLBACK on_message_handler(void *data,
     memcpy(buf, buffer, buffer_size);
     buf[buffer_size] = '\0';
 
-    msg_wrapper = jsonParseRaw(buf);
+    msg_wrapper = jsonParse(buf);
 
     if (msg_wrapper == NULL) {
         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
@@ -353,20 +534,21 @@ static size_t CALLBACK on_message_handler(void *data,
         }
     }
 
-    // TODO: activity log entry? -- requires message analysis
     osrfLogDebug(OSRF_LOG_MARK, 
         "WS relaying message thread=%s, xid=%s, recipient=%s", 
             thread, osrfLogGetXid(), recipient);
 
-    msg_body = jsonObjectToJSONRaw(osrf_msg);
+    msg_body = extract_inbound_messages(
+        r, service, thread, recipient, osrf_msg);
 
     transport_message *tmsg = message_init(
         msg_body, NULL, thread, recipient, NULL);
 
-    message_set_osrf_xid(tmsg, osrfLogGetXid());                                
-    client_send_message(osrf_handle, tmsg);                                   
-    osrfLogClearXid();
+    message_set_osrf_xid(tmsg, osrfLogGetXid());
+    client_send_message(osrf_handle, tmsg);
+
 
+    osrfLogClearXid();
     message_free(tmsg);                                                         
     jsonObjectFree(msg_wrapper);
     free(msg_body);
@@ -374,6 +556,25 @@ static size_t CALLBACK on_message_handler(void *data,
     return OK;
 }
 
+static size_t CALLBACK on_message_handler(void *data,
+                const WebSocketServer *server, const int type, 
+                unsigned char *buffer, const size_t buffer_size) {
+
+    if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+        return 1; // TODO: map to apr_status_t value?
+    }
+
+    apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
+
+    if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+        return 1;
+    }
+
+    return stat;
+}
+
 
 /**
  * Release all memory allocated from the translator pool and kill the pool.
@@ -384,10 +585,14 @@ void CALLBACK on_disconnect_handler(
     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
     trans->client_connected = 0;
 
-    apr_hash_clear(trans->session_cache);
+    /*
+    It's not necessary to destroy our session_pool, since
+    it's a child of the apache request_rec pool, which is 
+    destroyed after client disconnect.
     apr_pool_destroy(trans->session_pool);
+    */
+    
     trans->session_pool = NULL;
-    trans->session_cache = NULL;
 
     request_rec *r = server->request(server);
     osrfLogDebug(OSRF_LOG_MARK, 
@@ -401,6 +606,7 @@ void CALLBACK on_disconnect_handler(
 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
+        apr_thread_mutex_destroy(trans->mutex);
         apr_pool_destroy(trans->main_pool);
     }