LP#1268619: websockets: gateway code repairs & confing options
authorBill Erickson <berick@esilibrary.com>
Tue, 11 Mar 2014 21:25:19 +0000 (17:25 -0400)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:54:46 +0000 (15:54 -0700)
* avoid unneccessary and wrong incantation of apr_thread_exit.  The two
  sub-threads now both live for the duration of the process.
* to be safe, create thread mutex before threads

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>

README.websockets
src/gateway/osrf_websocket_translator.c

index 69a56c0..15f38b1 100644 (file)
@@ -23,10 +23,11 @@ Websockets installation instructions for Debian
 
 # OPTIONAL: add these configuration variables to
 # /etc/apache2-websockets/envvars and adjust as needed.
-# export OSRF_WEBSOCKET_IDLE_TIMEOUT=60
+# export OSRF_WEBSOCKET_IDLE_TIMEOUT=120
 # export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
 # export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
 # export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
+# export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600
 #
 # IDLE_TIMEOUT specifies how long we will allow a client to stay connected
 # while idle.  A longer timeout means less network traffic (from fewer
@@ -36,6 +37,12 @@ Websockets installation instructions for Debian
 # IDLE_CHECK_INTERVAL specifies how often we wake to check the idle status
 # of the connected client.
 #
+# MAX_REQUEST_WAIT_TIME is the maximum amount of time the gateway will
+# wait before declaring a client as idle when there is a long-running
+# outstanding request, yet no other activity is occurring.  This is
+# primarily a fail-safe to allow idle timeouts when one or more requests
+# died on the server, and thus no response was ever delivered to the gateway.
+#
 # Both specified in seconds
 #
 # CONFIG_FILE / CTXT are the standard opensrf core config options.
index 5b9d607..ef8d4af 100644 (file)
 #include "opensrf/osrfConfig.h"
 
 #define MAX_THREAD_SIZE 64
-#define RECIP_BUF_SIZE 128
+#define RECIP_BUF_SIZE 256
 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
+// maximun number of active, CONNECTed opensrf sessions allowed. in
+// practice, this number will be very small, rarely reaching double
+// digits.  This is just a security back-stop.  A client trying to open
+// this many connections is almost certainly attempting to DOS the
+// gateway / server.  We may want to lower this further.
+#define MAX_ACTIVE_STATEFUL_SESSIONS 128
 
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml";
 static char* config_ctxt = "gateway";
-static time_t idle_timeout_interval = 60; 
+
+static time_t idle_timeout_interval = 120; 
 static time_t idle_check_interval = 5;
 static time_t last_activity_time = 0;
 
+// Generally, we do not disconnect the client (as idle) if there is a
+// request in flight.  However, we need to have an upper bound on the
+// amount of time we will wait for in-flight requests to complete to
+// avoid leaving an effectively idle connection open after a request
+// died on the backend and no response was received.
+// Note that if other activity occurs while a long-running request
+// is active, the wait time will get reset with each new activity. 
+// This is OK, though, because the goal of max_request_wait_time
+// is not to chop requests off at the knees, it's to allow the client
+// to timeout as idle when only a single long-running request is active
+// and preventing timeout.
+static time_t max_request_wait_time = 600;
+
+// Incremented with every REQUEST, decremented with every COMPLETE.
+// Gives us a rough picture of the number of reqests we've sent to 
+// the server vs. the number for which a completed response has been 
+// received.
+static int requests_in_flight = 0;
+
 // true if we've received a signal to start graceful shutdown
 static int shutdown_requested = 0; 
 static void sigusr1_handler(int sig);
@@ -130,16 +156,18 @@ typedef struct _osrfWebsocketTranslator {
      * map internally means the caller never need know about
      * internal XMPP addresses and the server doesn't have to 
      * verify caller-specified recipient addresses.  It's
-     * all managed internally.
+     * all managed internally.  This is only used for stateful
+     * (CONNECT'ed) session.  Stateless sessions need not 
+     * track the recipient, since they are one-off calls.
      */
-    apr_hash_t *session_cache; 
+    apr_hash_t *stateful_session_cache; 
 
     /**
-     * session_pool contains the key/value pairs stored in
-     * the session_cache.  The pool is regularly destroyed
+     * stateful_session_pool contains the key/value pairs stored in
+     * the stateful_session_cache.  The pool is regularly destroyed
      * and re-created to avoid long-term memory consumption
      */
-    apr_pool_t *session_pool;
+    apr_pool_t *stateful_session_pool;
 
     /**
      * Thread responsible for collecting responses on the opensrf
@@ -181,31 +209,25 @@ static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 static void clear_cached_recipient(const char* thread) {
     apr_pool_t *pool = NULL;                                                
+    request_rec *r = trans->server->request(trans->server);
 
-    if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
+    if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
 
         osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
 
         // remove it from the hash
-        apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
-
-        if (apr_hash_count(trans->session_cache) == 0) {
-            osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
-
-            // memory accumulates in the session_pool as sessions are cached then 
-            // un-cached.  Un-caching removes strings from the hash, but not the 
-            // pool itself.  That only happens when the pool is destroyed. Here
-            // we destroy the session pool to clear any lingering memory, then
-            // re-create it for future caching.
-            apr_pool_destroy(trans->session_pool);
-    
-            if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
-                osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
-                trans->session_pool = NULL;
-                return;
-            }
-
-            trans->session_pool = pool;
+        apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL);
+
+        if (apr_hash_count(trans->stateful_session_cache) == 0) {
+            osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool");
+
+            // memory accumulates in the stateful_session_pool as
+            // sessions are cached then un-cached.  Un-caching removes
+            // strings from the hash, but not from the pool.  Clear the
+            // pool here. note: apr_pool_clear does not free memory, it
+            // reclaims it for use again within the pool.  This is more
+            // effecient than freeing and allocating every time.
+            apr_pool_clear(trans->stateful_session_pool);
         }
     }
 }
@@ -233,30 +255,44 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
             the correct recipient. */
         if (one_msg && one_msg->m_type == STATUS) {
 
+            if (one_msg->status_code == OSRF_STATUS_OK) {
 
-            // only cache recipients if the client is still connected
-            if (trans->client_connected && 
-                    one_msg->status_code == OSRF_STATUS_OK) {
-
-                if (!apr_hash_get(trans->session_cache, 
+                if (!apr_hash_get(trans->stateful_session_cache, 
                         tmsg->thread, APR_HASH_KEY_STRING)) {
 
-                    osrfLogDebug(OSRF_LOG_MARK, 
-                        "WS caching sender thread=%s, sender=%s", 
-                        tmsg->thread, tmsg->sender);
+                    apr_size_t ses_size = 
+                        apr_hash_count(trans->stateful_session_cache);
 
-                    apr_hash_set(trans->session_cache, 
-                        apr_pstrdup(trans->session_pool, tmsg->thread),
-                        APR_HASH_KEY_STRING, 
-                        apr_pstrdup(trans->session_pool, tmsg->sender));
+                    if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
+
+                        osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
+                            "thread=%s, sender=%s; concurrent=%d", 
+                            tmsg->thread, tmsg->sender, ses_size);
+
+                        apr_hash_set(trans->stateful_session_cache, 
+                            apr_pstrdup(trans->stateful_session_pool, tmsg->thread),
+                            APR_HASH_KEY_STRING, 
+                            apr_pstrdup(trans->stateful_session_pool, tmsg->sender));
+
+                    } else {
+                        osrfLogWarning(OSRF_LOG_MARK, 
+                            "WS max concurrent sessions (%d) reached.  "
+                            "Current session will not be tracked",
+                            MAX_ACTIVE_STATEFUL_SESSIONS
+                        );
+                    }
                 }
 
             } else {
 
                 // connection timed out; clear the cached recipient
-                // regardless of whether the client is still connected
-                if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+                if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
                     clear_cached_recipient(tmsg->thread);
+
+                } else {
+                    if (one_msg->status_code == OSRF_STATUS_COMPLETE)
+                        requests_in_flight--;
+                }
             }
         }
     }
@@ -265,16 +301,7 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
     // newly created osrfList.  We only need to free the list and 
     // the individual osrfMessage's will be freed along with it
     osrfListFree(msg_list);
-
-    if (!trans->client_connected) {
-
-        osrfLogInfo(OSRF_LOG_MARK, 
-            "WS discarding response for thread=%s", tmsg->thread);
-
-        return;
-    }
     
-    // client is still connected. 
     // relay the response messages to the client
     jsonObject *msg_wrapper = NULL;
     char *msg_string = NULL;
@@ -303,9 +330,67 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
 }
 
 /**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the 
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+
+    transport_message *tmsg;
+    while (1) {
+
+        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+            return NULL;
+        }
+
+        // wait for a response
+        tmsg = client_recv(osrf_handle, -1);
+
+        if (!tmsg) continue; // interrupt
+
+        if (trans->client_connected) {
+
+            if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+                osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+                return NULL;
+            }
+
+            osrfLogForceXid(tmsg->osrf_xid);
+            osrf_responder_thread_main_body(tmsg);
+            last_activity_time = time(NULL);
+        }
+
+        message_free(tmsg);                                                         
+    }
+
+    return NULL;
+}
+
+static int active_connection_count() {
+
+    if (requests_in_flight) {
+
+        time_t now = time(NULL);
+        time_t difference = now - last_activity_time;
+
+        if (difference >= max_request_wait_time) {
+            osrfLogWarning(OSRF_LOG_MARK, 
+                "%d In-flight request(s) took longer than %d seconds "
+                "to complete.  Treating request as dead and moving on.",
+                requests_in_flight, 
+                max_request_wait_time
+            );
+            requests_in_flight = 0;
+        }
+    }
+
+    return requests_in_flight;
+}
+
+/**
  * Sleep and regularly wake to see if the process has been idle for too
  * long.  If so, send a disconnect to the client.
- *
  */
 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
         apr_thread_t *thread, void *data) {
@@ -327,7 +412,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
         // During graceful shtudown, we may wait up to 
         // idle_check_interval seconds before initiating shutdown.
         sleep(sleep_time);
-        
+
         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
             return NULL;
@@ -339,8 +424,8 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
             continue;
         }
 
-        // do we have any active conversations with the connected client?
-        int active_count = apr_hash_count(trans->session_cache);
+        // do we have any active stateful conversations with the client?
+        int active_count = active_connection_count();
 
         if (active_count) {
 
@@ -385,7 +470,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
             time_t difference = now - last_activity_time;
 
             osrfLogDebug(OSRF_LOG_MARK, 
-                "WS has been idle for %d seconds", difference);
+                "WS connection idle for %d seconds", difference);
 
             if (difference < idle_timeout_interval) {
                 // Last activity occurred within the idle timeout interval.
@@ -412,40 +497,90 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
     return NULL;
 }
 
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the 
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+static int build_startup_data(const WebSocketServer *server) {
 
-    transport_message *tmsg;
-    while (1) {
+    apr_pool_t *main_pool = NULL;                                                
+    apr_pool_t *stateful_session_pool = NULL;                                                
+    apr_thread_t *thread = NULL;
+    apr_threadattr_t *thread_attr = NULL;
+    apr_thread_mutex_t *mutex = NULL;
+    request_rec *r = server->request(server);
 
-        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
-            return NULL;
-        }
+    // create a pool for our translator data
+    // Do not use r->pool as the parent, since r->pool will be freed
+    // when the current client disconnects.
+    if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
+        return 1;
+    }
 
-        // wait for a response
-        tmsg = client_recv(osrf_handle, -1);
+    trans = (osrfWebsocketTranslator*) 
+        apr_palloc(main_pool, sizeof(osrfWebsocketTranslator));
 
-        if (!tmsg) continue; // early exit on interrupt
+    if (trans == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
+        return 1;
+    }
 
-        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
-            return NULL;
-        }
+    trans->server = server;
+    trans->main_pool = main_pool;
+    trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
+    trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
-        osrfLogForceXid(tmsg->osrf_xid);
-        osrf_responder_thread_main_body(tmsg);
-        message_free(tmsg);                                                         
-        last_activity_time = time(NULL);
+    // opensrf session / recipient cache
+    trans->stateful_session_cache = apr_hash_make(trans->main_pool);
+    if (trans->stateful_session_cache == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+        return 1;
     }
 
-    return NULL;
-}
+    // opensrf session / recipient string pool; cleared regularly
+    // the only data entering this pools are the session strings.
+    if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
+        return NULL;
+    }
+    trans->stateful_session_pool = stateful_session_pool;
+
+    if (apr_thread_mutex_create(
+            &mutex, APR_THREAD_MUTEX_UNNESTED, 
+            trans->main_pool) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+        return 1;
+    }
+    trans->mutex = mutex;
+
+    // responder thread
+    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+         (apr_thread_create(&thread, thread_attr, 
+                osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+        trans->responder_thread = thread;
+        
+    } else {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
+        return 1;
+    }
+
+    // idle timeout thread
+    thread = NULL; // reset
+    thread_attr = NULL; // reset
+    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+         (apr_thread_create(&thread, thread_attr, 
+            osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+        osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
+        trans->idle_timeout_thread = thread;
+        
+    } else {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
+        return 1;
+    }
 
+    return APR_SUCCESS;
+}
 
 
 /**
@@ -453,14 +588,8 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
  * session cache and session pool.
  */
 int child_init(const WebSocketServer *server) {
-
-    apr_pool_t *pool = NULL;                                                
-    apr_thread_t *thread = NULL;
-    apr_threadattr_t *thread_attr = NULL;
-    apr_thread_mutex_t *mutex = NULL;
     request_rec *r = server->request(server);
 
-
     // osrf_handle will already be connected if this is not the first request
     // served by this process.
     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
@@ -479,6 +608,21 @@ int child_init(const WebSocketServer *server) {
         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
             "WS: timeout set to %d", idle_timeout_interval);
 
+        timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME");
+        if (timeout) {
+            if (!atoi(timeout)) {
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+                    "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", 
+                    timeout
+                );
+            } else {
+                max_request_wait_time = (time_t) atoi(timeout);
+            }
+        }
+
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+            "WS: max request wait time set to %d", max_request_wait_time);
+
         char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
         if (interval) {
             if (!atoi(interval)) {
@@ -524,76 +668,7 @@ int child_init(const WebSocketServer *server) {
         osrf_handle = osrfSystemGetTransportClient();
     }
 
-    // create a standalone pool for our translator data
-    if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
-        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
-        return 1;
-    }
-
-    // allocate our static translator instance
-    trans = (osrfWebsocketTranslator*) 
-        apr_palloc(pool, sizeof(osrfWebsocketTranslator));
-
-    if (trans == NULL) {
-        osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
-        return 1;
-    }
-
-    trans->main_pool = pool;
-    trans->server = server;
-    trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
-    trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
-
-    trans->session_cache = apr_hash_make(pool);
-
-    if (trans->session_cache == NULL) {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
-        return 1;
-    }
-
-    // Create the responder thread.  Once created, 
-    // it runs for the lifetime of this process.
-    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
-         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
-         (apr_thread_create(&thread, thread_attr, 
-                osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
-
-        trans->responder_thread = thread;
-        
-    } else {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
-        return 1;
-    }
-
-    // Create the idle timeout thread, which lives for the lifetime
-    // of the process.
-    thread = NULL; // reset
-    thread_attr = NULL; // reset
-    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
-         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
-         (apr_thread_create(&thread, thread_attr, 
-            osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
-
-        osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
-        trans->idle_timeout_thread = thread;
-        
-    } else {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
-        return 1;
-    }
-
-
-    if (apr_thread_mutex_create(
-            &mutex, APR_THREAD_MUTEX_UNNESTED, 
-            trans->main_pool) != APR_SUCCESS) {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
-        return 1;
-    }
-
-    trans->mutex = mutex;
-
     signal(SIGUSR1, sigusr1_handler);
-
     return APR_SUCCESS;
 }
 
@@ -602,32 +677,23 @@ int child_init(const WebSocketServer *server) {
  */
 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
     request_rec *r = server->request(server);
-    apr_pool_t *pool;
-    apr_thread_t *thread = NULL;
-    apr_threadattr_t *thread_attr = NULL;
 
-    const char* client_ip = get_client_ip(r);
-    osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+    if (!trans) { // first connection
 
-    if (!trans) {
-        // first connection
-        if (child_init(server) != APR_SUCCESS) {
+        // connect to opensrf
+        if (child_init(server) != APR_SUCCESS)
             return NULL;
-        }
-    }
 
-    // create a standalone pool for the session cache values
-    // this pool will be destroyed and re-created regularly
-    // to clear session memory
-    if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
-        osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
-        return NULL;
+        // build pools, thread data, and the translator
+        if (build_startup_data(server) != APR_SUCCESS)
+            return NULL;
     }
 
-    trans->session_pool = pool;
-    trans->client_connected = 1;
-    last_activity_time = time(NULL);
+    const char* client_ip = get_client_ip(r);
+    osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
 
+    last_activity_time = time(NULL);
+    trans->client_connected = 1;
     return trans;
 }
 
@@ -702,6 +768,7 @@ static char* extract_inbound_messages(
                 }
                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
                 buffer_free(act);
+                requests_in_flight++;
                 break;
             }
 
@@ -793,7 +860,7 @@ static size_t on_message_handler_body(void *data,
         // since clients can provide their own threads at session start time,
         // the presence of a thread does not guarantee a cached recipient
         recipient = (char*) apr_hash_get(
-            trans->session_cache, thread, APR_HASH_KEY_STRING);
+            trans->stateful_session_cache, thread, APR_HASH_KEY_STRING);
 
         if (recipient) {
             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
@@ -837,7 +904,6 @@ static size_t on_message_handler_body(void *data,
     free(msg_body);
 
     last_activity_time = time(NULL);
-
     return OK;
 }
 
@@ -867,46 +933,26 @@ static size_t CALLBACK on_message_handler(void *data,
 void CALLBACK on_disconnect_handler(
     void *data, const WebSocketServer *server) {
 
-    osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
+    // if the threads wake up during disconnect, this tells 
+    // them to go back to sleep.
     trans->client_connected = 0;
 
-    // timeout thread is recreated w/ each new connection
-    apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
-    trans->idle_timeout_thread = NULL;
-    
-    // ensure no errant session data is sticking around
-    apr_hash_clear(trans->session_cache);
-
-    // strictly speaking, this pool will get destroyed when
-    // r->pool is destroyed, but it doesn't hurt to explicitly
-    // destroy it ourselves.
-    apr_pool_destroy(trans->session_pool);
-    trans->session_pool = NULL;
-
     request_rec *r = server->request(server);
-
     osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
-}
-
-/**
- * Be nice and clean up our mess
- */
-void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
-    if (trans) {
-        apr_thread_exit(trans->responder_thread, APR_SUCCESS);
-        apr_thread_mutex_destroy(trans->mutex);
-        if (trans->session_pool)
-            apr_pool_destroy(trans->session_pool);
-        apr_pool_destroy(trans->main_pool);
-    }
 
-    trans = NULL;
+    // Clear any lingering session data
+    // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free
+    // the memory, but since there is a limit to the size of the pool
+    // (max_concurrent_sessions), the memory cannot grow unbounded, 
+    // so there's no need.
+    apr_hash_clear(trans->stateful_session_cache);
+    apr_pool_clear(trans->stateful_session_pool);
 }
 
 static WebSocketPlugin osrf_websocket_plugin = {
     sizeof(WebSocketPlugin),
     WEBSOCKET_PLUGIN_VERSION_0,
-    on_destroy_handler,
+    NULL, // on_destroy_handler
     on_connect_handler,
     on_message_handler,
     on_disconnect_handler