LP#1268619: websocket gateway repairs and cleanup
authorBill Erickson <berick@esilibrary.com>
Mon, 29 Oct 2012 21:27:44 +0000 (17:27 -0400)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:50:15 +0000 (15:50 -0700)
* use jsonObjectFree() on jsonObjets, not free();
* removed some debugging logs
* accommodate API changes for Apache 2.4
* safer logging:

  Avoid using ap_log_rerror, in particular referencing server->request
  from the responder thread, since the request_rec will be invalid after
  on_disconnect is called.

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>

src/gateway/osrf_websocket_translator.c

index fd19f2d..b6205d8 100644 (file)
  */
 
 #include "httpd.h"
-#include "http_log.h"
-#include "http_log.h"
 #include "apr_strings.h"
 #include "apr_thread_proc.h"
 #include "apr_hash.h"
 #include "websocket_plugin.h"
+#include "opensrf/log.h"
 #include "opensrf/osrf_json.h"
 #include "opensrf/transport_client.h"
 #include "opensrf/transport_message.h"
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 128
-static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-static transport_client *osrf_handle = NULL;
 
 typedef struct _osrfWebsocketTranslator {
     const WebSocketServer *server;
-    apr_pool_t *main_pool; // standline per-process pool
+    apr_pool_t *main_pool; // standalone per-process pool
     apr_pool_t *session_pool; // child of trans->main_pool; per-session
     apr_hash_t *session_cache; 
     apr_thread_t *responder_thread;
@@ -83,6 +80,8 @@ typedef struct _osrfWebsocketTranslator {
 } osrfWebsocketTranslator;
 
 static osrfWebsocketTranslator *trans = NULL;
+static transport_client *osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 
 /**
@@ -92,38 +91,41 @@ static osrfWebsocketTranslator *trans = NULL;
  */
 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
 
-    request_rec *r = trans->server->request(trans->server);
+    transport_message *tmsg;
     jsonObject *msg_wrapper;
     char *msg_string;
 
     while (1) {
 
-        transport_message *msg = client_recv(osrf_handle, -1);
-        if (!msg) continue; // early exit on interrupt
+        tmsg = client_recv(osrf_handle, -1);
+
+        if (!tmsg) continue; // early exit on interrupt
         
         // discard responses received after client disconnect
         if (!trans->client_connected) {
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+            osrfLogDebug(OSRF_LOG_MARK, 
                 "WS discarding response for thread=%s, xid=%s", 
-                    msg->thread, msg->osrf_xid);
-            message_free(msg);                                                         
+                tmsg->thread, tmsg->osrf_xid);
+            message_free(tmsg);                                                         
             continue; 
         }
 
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+
+        osrfLogDebug(OSRF_LOG_MARK, 
             "WS received opensrf response for thread=%s, xid=%s", 
-                msg->thread, msg->osrf_xid);
+                tmsg->thread, tmsg->osrf_xid);
 
         // build the wrapper object
         msg_wrapper = jsonNewObject(NULL);
-        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
-        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
-        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
+        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
 
-        if (msg->is_error) {
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+        if (tmsg->is_error) {
+            fprintf(stderr,  
                 "WS received jabber error message in response to thread=%s and xid=%s", 
-                    msg->thread, msg->osrf_xid);
+                tmsg->thread, tmsg->osrf_xid);
+            fflush(stderr);
             jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
         }
 
@@ -135,21 +137,22 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
 
         // capture the true message sender
         // TODO: this will grow to add one entry per client session.  
-        // need a last-touched timeout mechanism to periodically remove old entries
-        if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
+        // need to ensure that connected-sessions don't last /too/ long or create 
+        // a last-touched timeout mechanism to periodically remove old  entries
+        if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
 
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-                "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
+            osrfLogDebug(OSRF_LOG_MARK, 
+                "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
 
             apr_hash_set(trans->session_cache, 
-                apr_pstrdup(trans->session_pool, msg->thread),
+                apr_pstrdup(trans->session_pool, tmsg->thread),
                 APR_HASH_KEY_STRING, 
-                apr_pstrdup(trans->session_pool, msg->sender));
+                apr_pstrdup(trans->session_pool, tmsg->sender));
         }
 
         free(msg_string);
         jsonObjectFree(msg_wrapper);
-        message_free(msg);                                                         
+        message_free(tmsg);                                                         
     }
 
     return NULL;
@@ -165,7 +168,7 @@ int child_init(const WebSocketServer *server) {
     apr_threadattr_t *thread_attr = NULL;
     request_rec *r = server->request(server);
         
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init");
+    osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
 
     // osrf_handle will already be connected if this is not the first request
     // served by this process.
@@ -173,7 +176,7 @@ int child_init(const WebSocketServer *server) {
         char* config_file = "/openils/conf/opensrf_core.xml";
         char* config_ctx = "gateway"; //TODO config
         if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,                              
+            osrfLogError(OSRF_LOG_MARK, 
                 "WS unable to bootstrap OpenSRF client with config %s", config_file); 
             return 1;
         }
@@ -183,7 +186,7 @@ int child_init(const WebSocketServer *server) {
 
     // create a standalone pool for our translator data
     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool");
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return 1;
     }
 
@@ -193,7 +196,7 @@ int child_init(const WebSocketServer *server) {
         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
 
     if (trans == NULL) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator");
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
         return 1;
     }
 
@@ -202,8 +205,8 @@ int child_init(const WebSocketServer *server) {
     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
-    // Create the responder thread.  Once created, it runs for the lifetime
-    // of this process.
+    // Create the responder thread.  Once created, 
+    // it runs for the lifetime of this process.
     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
          (apr_thread_create(&thread, thread_attr, 
@@ -212,8 +215,7 @@ int child_init(const WebSocketServer *server) {
         trans->responder_thread = thread;
         
     } else {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-            "WS unable to create responder thread");
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
         return 1;
     }
 
@@ -227,8 +229,9 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
     request_rec *r = server->request(server);
     apr_pool_t *pool;
 
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-        "WS connect from %s", r->connection->remote_ip);
+    osrfLogDebug(OSRF_LOG_MARK, 
+        "WS connect from %s", r->connection->remote_ip); 
+        //"WS connect from %s", r->connection->client_ip); // apache 2.4
 
     if (!trans) {
         if (child_init(server) != APR_SUCCESS) {
@@ -239,26 +242,18 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
     // create a standalone pool for the session cache values, which will be
     // destroyed on client disconnect.
     if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-            "WS Unable to create apr_pool");
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
     }
 
     trans->session_pool = pool;
     trans->session_cache = apr_hash_make(trans->session_pool);
 
-    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-        "WS created new pool %x", trans->session_pool);
-
     if (trans->session_cache == NULL) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-            "WS unable to create session cache");
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
         return NULL;
     }
 
-    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-        "WS created new hash %x", trans->session_cache);
-
     trans->client_connected = 1;
     return trans;
 }
@@ -285,8 +280,7 @@ static size_t CALLBACK on_message_handler(void *data,
 
     if (buffer_size <= 0) return OK;
 
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-        "WS received message size=%d", buffer_size);
+    osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
 
     // buffer may not be \0-terminated, which jsonParse requires
     char buf[buffer_size + 1];
@@ -296,8 +290,7 @@ static size_t CALLBACK on_message_handler(void *data,
     msg_wrapper = jsonParseRaw(buf);
 
     if (msg_wrapper == NULL) {
-        ap_log_rerror(APLOG_MARK, 
-            APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf);
+        osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
         return HTTP_BAD_REQUEST;
     }
 
@@ -313,13 +306,17 @@ static size_t CALLBACK on_message_handler(void *data,
         log_xid = jsonObjectGetString(tmp_obj);
 
     if (log_xid) {
+
         // use the caller-provide log trace id
         if (strlen(log_xid) > MAX_THREAD_SIZE) {
-            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
-                0, r, "WS log_xid exceeds max length");
+            osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
             return HTTP_BAD_REQUEST;
         }
-        osrfLogSetXid(log_xid); // TODO: make with with non-client
+
+        // TODO: make this work with non-client and make this call accept 
+        // const char*'s.  casting to (char*) for now to silence warnings.
+        osrfLogSetXid((char*) log_xid); 
+
     } else {
         // generate a new log trace id for this relay
         osrfLogMkXid();
@@ -328,8 +325,7 @@ static size_t CALLBACK on_message_handler(void *data,
     if (thread) {
 
         if (strlen(thread) > MAX_THREAD_SIZE) {
-            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
-                0, r, "WS thread exceeds max length");
+            osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
             return HTTP_BAD_REQUEST;
         }
 
@@ -339,8 +335,7 @@ static size_t CALLBACK on_message_handler(void *data,
             trans->session_cache, thread, APR_HASH_KEY_STRING);
 
         if (recipient) {
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-                "WS found cached recipient %s", recipient);
+            osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
         }
     }
 
@@ -353,14 +348,13 @@ static size_t CALLBACK on_message_handler(void *data,
             recipient = recipient_buf;
 
         } else {
-            ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
-                0, r, "WS Unable to determine recipient");
+            osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
             return HTTP_BAD_REQUEST;
         }
     }
 
     // TODO: activity log entry? -- requires message analysis
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+    osrfLogDebug(OSRF_LOG_MARK, 
         "WS relaying message thread=%s, xid=%s, recipient=%s", 
             thread, osrfLogGetXid(), recipient);
 
@@ -374,7 +368,7 @@ static size_t CALLBACK on_message_handler(void *data,
     osrfLogClearXid();
 
     message_free(tmsg);                                                         
-    free(msg_wrapper);
+    jsonObjectFree(msg_wrapper);
     free(msg_body);
 
     return OK;
@@ -396,14 +390,15 @@ void CALLBACK on_disconnect_handler(
     trans->session_cache = NULL;
 
     request_rec *r = server->request(server);
-    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-        "WS disconnect from %s", r->connection->remote_ip);
+    osrfLogDebug(OSRF_LOG_MARK, 
+        "WS disconnect from %s", r->connection->remote_ip); 
+        //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
 }
 
+/**
+ * Be nice and clean up our mess
+ */
 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
-    fprintf(stderr, "WS on_destroy_handler()\n");
-    fflush(stderr);
-
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
         apr_pool_destroy(trans->main_pool);