LP#1777180 Websocketd gateway and test scripts
authorBill Erickson <berickxx@gmail.com>
Sat, 9 Jun 2018 23:05:25 +0000 (19:05 -0400)
committerBen Shum <ben@evergreener.net>
Wed, 12 Sep 2018 15:27:33 +0000 (11:27 -0400)
Adds a new OpenSRF binary/program for relaying websockets messages
to and from a websocketd instance.  The new binary
(osrf-websocket-stdio) performs the same tasks as the
osrf_websocket_translator.c Apache module, minus the Apache module glue
and the extra threading required to run within the Apache module.

Commit includes 2 test scripts (tester.pl and test-stateful.pl) for
generating large series of test messages to send to a websockets
instance. tester.pl sends echo requests only, test-stateful.pl sends
connect->echo-request->disconnect batches across a configurable number
of forked processes.

INSTALL document updated to include websocketd setup as an alternative
to Apache websockets.

Signed-off-by: Bill Erickson <berickxx@gmail.com>
Signed-off-by: Jeff Davis <jeff.davis@bc.libraries.coop>
Signed-off-by: Jason Stephenson <jason@sigio.com>
Signed-off-by: Ben Shum <ben@evergreener.net>

.gitignore
README
configure.ac
src/Makefile.am
src/websocket-stdio/Makefile.am [new file with mode: 0644]
src/websocket-stdio/osrf-websocket-stdio.c [new file with mode: 0644]
src/websocket-stdio/test-stateful.pl [new file with mode: 0755]
src/websocket-stdio/tester.pl [new file with mode: 0755]

index 0170ebe..ac2a33e 100644 (file)
@@ -96,3 +96,7 @@ tests/check_transport_client
 tests/check_transport_message
 tests/Makefile
 tests/Makefile.in
+src/websocket-stdio/.deps/
+src/websocket-stdio/.libs/
+src/websocket-stdio/osrf-websocket-stdio
+src/websocket-stdio/*.o
diff --git a/README b/README
index 46c311e..2959e6e 100644 (file)
--- a/README
+++ b/README
@@ -417,8 +417,8 @@ srfsh# request opensrf.math add 2,2
 +
 You should receive the value `4`.
 
-Optional: Websockets installation instructions
-----------------------------------------------
+Websockets installation instructions: Option #1 Apache
+-------------------------------------------------------
 Websockets are new to OpenSRF 2.4+ and are required for operating the new web-based
 staff client for Evergreen.  Complete the following steps as the *root* Linux 
 account:
@@ -520,6 +520,36 @@ export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600
 /etc/init.d/apache2-websockets start
 ---------------------------------------------------------------------------
 
+Websockets installation instructions: Option #2 Websocketd
+----------------------------------------------------------
+
+1. Install websocketd (latest stable release from http://websocketd.com/)
++
+.(Debian, Ubuntu)
+[source,bash]
+---------------------------------------------------------------------------
+cd /tmp
+wget 'https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_amd64.zip'
+unzip websocketd-0.3.0-linux_amd64.zip
+sudo cp websocketd /usr/local/bin/
+---------------------------------------------------------------------------
++
+2. Run websocketd as 'opensrf'
++
+.(Debian, Ubuntu)
+[source,bash]
+---------------------------------------------------------------------------
+/usr/local/bin/websocketd --port 7682 /openils/bin/osrf-websocket-stdio &
+
+# Other useful command line parameters include:
+# --loglevel debug|trace|access|info|error|fatal
+# --maxforks <n>
+# --sameorigin=true
+# --origin=host[:port][,host[:port]...]
+
+# See https://github.com/joewalnes/websocketd/blob/master/help.go
+---------------------------------------------------------------------------
+
 Optional: Using NGINX as a proxy
 --------------------------------
 NGINX can be used to proxy HTTP, HTTPS, and WebSockets traffic. Among other
index 073d6f8..48ffab8 100644 (file)
@@ -387,6 +387,7 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
                         src/python/opensrf.py
                         src/router/Makefile
                         src/srfsh/Makefile
+                        src/websocket-stdio/Makefile
              tests/Makefile
                         bin/opensrf-perl.pl
                         bin/osrf_config])
index a86281f..c8b6107 100644 (file)
@@ -40,7 +40,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas
 endif
 
 if BUILDCORE
-MAYBE_CORE = libopensrf c-apps router srfsh gateway perl
+MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
 if BUILDPYTHON
 dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl @top_srcdir@/src/python/opensrf.py @top_srcdir@/src/python/srfsh.py
 else
diff --git a/src/websocket-stdio/Makefile.am b/src/websocket-stdio/Makefile.am
new file mode 100644 (file)
index 0000000..b9e6ba9
--- /dev/null
@@ -0,0 +1,22 @@
+# Copyright (C) 2018 King County Library Service
+# Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+
+LDADD = $(DEF_LDLIBS) 
+AM_CFLAGS = $(DEF_CFLAGS) -DEXEC_DEFAULT -L@top_builddir@/src/libopensrf
+AM_LDFLAGS = $(DEF_LDFLAGS)
+
+DISTCLEANFILES = Makefile.in Makefile
+
+bin_PROGRAMS = osrf-websocket-stdio
+osrf_websocket_stdio_SOURCES = osrf-websocket-stdio.c
diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c
new file mode 100644 (file)
index 0000000..96d62f1
--- /dev/null
@@ -0,0 +1,609 @@
+/* --------------------------------------------------------------------
+ * Copyright (C) 2018 King County Library Service
+ * Bill Erickson <berickxx@gmail.com>
+ *
+ * Code borrows heavily from osrf_websocket_translator.c
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+--------------------------------------------------------------------- */
+
+/**
+ * OpenSRF Websockets Relay
+ *
+ * Reads Websockets requests on STDIN
+ * Sends replies to requests on STDOUT
+ *
+ * Built to function with websocketd:
+ * https://github.com/joewalnes/websocketd
+ *
+ * Synopsis:
+ *
+ * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml &
+ *
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <signal.h>
+#include <opensrf/utils.h>
+#include <opensrf/osrf_hash.h>
+#include <opensrf/transport_client.h>
+#include <opensrf/osrf_message.h>
+#include <opensrf/osrf_app_session.h>
+#include <opensrf/log.h>
+
+#define MAX_THREAD_SIZE 64
+#define RECIP_BUF_SIZE 256
+#define WEBSOCKET_INGRESS "ws-translator-v2"
+
+// maximun number of active, CONNECTed opensrf sessions allowed. in
+// practice, this number will be very small, rarely reaching double
+// digits.  This is just a security back-stop.  A client trying to open
+// this many connections is almost certainly attempting to DOS the
+// gateway / server.
+#define MAX_ACTIVE_STATEFUL_SESSIONS 64
+
+// Message exceeding this size are discarded.
+// This value must be greater than RESET_MESSAGE_SIZE (below)
+// ~10M
+#define MAX_MESSAGE_SIZE 10485760
+
+// After processing any message this size or larger, free and
+// recreate the stdin buffer to release the memory.
+// ~100k
+#define RESET_MESSAGE_SIZE 102400
+
+// default values, replaced during setup (below) as needed.
+static char* config_file = "/openils/conf/opensrf_core.xml";
+static char* config_ctxt = "gateway";
+static char* osrf_router = NULL;
+static char* osrf_domain = NULL;
+
+// Cache of opensrf thread strings and back-end receipients.
+// Tracking this here means the caller only needs to track the thread.
+// It also means we don't have to expose internal XMPP IDs
+static osrfHash* stateful_session_cache = NULL;
+// Message on STDIN go into our reusable buffer
+static growing_buffer* stdin_buf = NULL;
+// OpenSRF XMPP connection handle
+static transport_client* osrf_handle = NULL;
+// Reusable string buf for recipient addresses
+static char recipient_buf[RECIP_BUF_SIZE];
+// Websocket client IP address (for logging)
+static char* client_ip = NULL;
+
+static void rebuild_stdin_buffer();
+static void child_init(int argc, char* argv[]);
+static void read_from_stdin();
+static void relay_stdin_message(const char*);
+static char* extract_inbound_messages();
+static void log_request(const char*, osrfMessage*);
+static void read_from_osrf();
+static void read_one_osrf_message(transport_message*);
+static int shut_it_down(int);
+static void release_hash_string(char*, void*);
+
+// Websocketd sends SIGINT for shutdown, followed by SIGTERM
+// if SIGINT takes too long.
+static void sigint_handler(int sig) {
+    osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
+    shut_it_down(0);
+}
+
+int main(int argc, char* argv[]) {
+
+    // Handle shutdown signal -- only needed once.
+    signal(SIGINT, sigint_handler);
+
+    // Connect to OpenSR -- exits on error
+    child_init(argc, argv);
+
+    // Disable output buffering.
+    setbuf(stdout, NULL);
+    rebuild_stdin_buffer();
+
+    // The main loop waits for data to be available on both STDIN
+    // (websocket client request) and the OpenSRF XMPP socket 
+    // (replies returning to the websocket client).
+    fd_set fds;
+    int stdin_no = fileno(stdin);
+    int osrf_no = osrf_handle->session->sock_id;
+    int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    int sel_resp;
+
+    while (1) {
+
+        FD_ZERO(&fds);
+        FD_SET(osrf_no, &fds);
+        FD_SET(stdin_no, &fds);
+
+        // Wait indefinitely for activity to process
+        sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+
+        if (sel_resp < 0) { // error
+
+            if (errno == EINTR) {
+                // Interrupted by a signal.  Start the loop over.
+                continue;
+            }
+
+            osrfLogError(OSRF_LOG_MARK,
+                "WS select() failed with [%s]. Exiting", strerror(errno));
+
+            shut_it_down(1);
+        }
+
+        if (FD_ISSET(stdin_no, &fds)) {
+            read_from_stdin();
+        }
+
+        if (FD_ISSET(osrf_no, &fds)) {
+            read_from_osrf();
+        }
+    }
+
+    return shut_it_down(0);
+}
+
+static void rebuild_stdin_buffer() {
+
+    if (stdin_buf != NULL) {
+        buffer_free(stdin_buf);
+    }
+
+    stdin_buf = buffer_init(1024);
+}
+
+static int shut_it_down(int stat) {
+    osrfHashFree(stateful_session_cache);
+    buffer_free(stdin_buf);
+    osrf_system_shutdown(); // clean XMPP disconnect
+    exit(stat);
+    return stat;
+}
+
+
+// Connect to OpenSRF/XMPP
+// Apply settings and command line args.
+static void child_init(int argc, char* argv[]) {
+
+    if (argc > 1) {
+        config_file = argv[1];
+    }
+
+    if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
+        fprintf(stderr, "Cannot boostrap OSRF\n");
+        shut_it_down(1);
+    }
+
+       osrf_handle = osrfSystemGetTransportClient();
+       osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
+
+    osrf_router = osrfConfigGetValue(NULL, "/router_name");
+    osrf_domain = osrfConfigGetValue(NULL, "/domain");
+
+    stateful_session_cache = osrfNewHash();
+    osrfHashSetCallback(stateful_session_cache, release_hash_string);
+
+    client_ip = getenv("REMOTE_ADDR");
+    osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+}
+
+// Called by osrfHash when a string is removed.  We strdup each
+// string before it goes into the hash.
+static void release_hash_string(char* key, void* str) {
+    if (str == NULL) return;
+    free((char*) str);
+}
+
+
+// Relay websocket client messages from STDIN to OpenSRF.  Reads one
+// message then returns, allowing responses to intermingle with long
+// series of requests.
+static void read_from_stdin() {
+    char char_buf[1];
+    char c;
+
+    // Read one char at a time so we can stop at the first newline
+    // and leave any other data on the wire until read_from_stdin()
+    // is called again.
+
+    while (1) {
+        int stat = read(fileno(stdin), char_buf, 1);
+
+        if (stat < 0) {
+
+            if (errno == EAGAIN) {
+                // read interrupted.  Return to main loop to resume.
+                // Returning here will leave any in-progress message in
+                // the stdin_buf.  We return to the main select loop
+                // to confirm we really have more data to read and to
+                // perform additional error checking on the stream.
+                return;
+            }
+
+            // All other errors reading STDIN are considered fatal.
+            osrfLogError(OSRF_LOG_MARK,
+                "WS STDIN read failed with [%s]. Exiting", strerror(errno));
+            shut_it_down(1);
+            return;
+        }
+
+        if (stat == 0) { // EOF
+            osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
+            shut_it_down(0);
+            return;
+        }
+
+        c = char_buf[0];
+
+        if (c == '\n') { // end of current message
+
+            if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+                osrfLogError(OSRF_LOG_MARK,
+                    "WS message exceeded MAX_MESSAGE_SIZE, discarding");
+                rebuild_stdin_buffer();
+                return;
+            }
+
+            if (stdin_buf->n_used > 0) {
+                relay_stdin_message(stdin_buf->buf);
+
+                if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
+                    // Current message is large.  Rebuild the buffer
+                    // to free the excess memory.
+                    rebuild_stdin_buffer();
+
+                } else {
+
+                    // Reset the buffer and carry on.
+                    buffer_reset(stdin_buf);
+                }
+            }
+
+            return;
+
+        } else {
+
+            if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+                // Message exceeds max message size.  Continue reading
+                // and discarding data.  NOTE: don't reset stdin_buf
+                // here becase we check n_used again once reading is done.
+                continue;
+            }
+
+            // Add the char to our current message buffer
+            buffer_add_char(stdin_buf, c);
+        }
+    }
+}
+
+// Relays a single websocket request to the OpenSRF/XMPP network.
+static void relay_stdin_message(const char* msg_string) {
+
+    jsonObject *msg_wrapper = NULL; // free me
+    const jsonObject *tmp_obj = NULL;
+    const jsonObject *osrf_msg = NULL;
+    const char *service = NULL;
+    const char *thread = NULL;
+    const char *log_xid = NULL;
+    char *msg_body = NULL;
+    char *recipient = NULL;
+
+    // generate a new log trace for this request. it
+    // may be replaced by a client-provided trace below.
+    osrfLogMkXid();
+
+    osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
+
+    msg_wrapper = jsonParse(msg_string);
+
+    if (msg_wrapper == NULL) {
+        osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
+        return;
+    }
+
+    osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
+        service = jsonObjectGetString(tmp_obj);
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
+        thread = jsonObjectGetString(tmp_obj);
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
+        log_xid = jsonObjectGetString(tmp_obj);
+
+    if (log_xid) {
+
+        // use the caller-provide log trace id
+        if (strlen(log_xid) > MAX_THREAD_SIZE) {
+            osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
+            return;
+        }
+
+        osrfLogForceXid(log_xid);
+    }
+
+    if (thread) {
+
+        if (strlen(thread) > MAX_THREAD_SIZE) {
+            osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
+            return;
+        }
+
+        // since clients can provide their own threads at session start time,
+        // the presence of a thread does not guarantee a cached recipient
+        recipient = (char*) osrfHashGet(stateful_session_cache, thread);
+
+        if (recipient) {
+            osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
+        }
+    }
+
+    if (!recipient) {
+
+        if (service) {
+            int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
+                "%s@%s/%s", osrf_router, osrf_domain, service);
+            recipient_buf[size] = '\0';
+            recipient = recipient_buf;
+
+        } else {
+            osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
+            return;
+        }
+    }
+
+    osrfLogDebug(OSRF_LOG_MARK,
+        "WS relaying message to opensrf thread=%s, recipient=%s",
+            thread, recipient);
+
+    // 'recipient' will be freed in extract_inbound_messages
+    // during a DISCONNECT call.  Retain a local copy.
+    recipient = strdup(recipient);
+
+    msg_body = extract_inbound_messages(service, thread, osrf_msg);
+
+    osrfLogInternal(OSRF_LOG_MARK,
+        "WS relaying inbound message: %s", msg_body);
+
+    transport_message *tmsg = message_init(
+        msg_body, NULL, thread, recipient, NULL);
+
+    free(recipient);
+
+    message_set_osrf_xid(tmsg, osrfLogGetXid());
+
+    if (client_send_message(osrf_handle, tmsg) != 0) {
+        osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
+        shut_it_down(1);
+    }
+
+    osrfLogClearXid();
+    message_free(tmsg);
+    jsonObjectFree(msg_wrapper);
+    free(msg_body);
+}
+
+// Turn the OpenSRF message JSON into a set of osrfMessage's for 
+// analysis, ingress application, and logging.
+static char* extract_inbound_messages(
+        const char* service, const char* thread, const jsonObject *osrf_msg) {
+
+    int i;
+    int num_msgs = osrf_msg->size;
+    osrfMessage* msg;
+    osrfMessage* msg_list[num_msgs];
+
+    // here we do an extra json round-trip to get the data
+    // in a form osrf_message_deserialize can understand
+    // TODO: consider a version of osrf_message_init which can
+    // accept a jsonObject* instead of a JSON string.
+    char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+    osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+    free(osrf_msg_json);
+
+    // should we require the caller to always pass the service?
+    if (service == NULL) service = "";
+
+    for (i = 0; i < num_msgs; i++) {
+        msg = msg_list[i];
+        osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
+
+        switch (msg->m_type) {
+
+            case CONNECT:
+                break;
+
+            case REQUEST:
+                log_request(service, msg);
+                break;
+
+            case DISCONNECT:
+                osrfHashRemove(stateful_session_cache, thread);
+                break;
+
+            default:
+                osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
+                    "type from WebSocket client: %d", msg->m_type);
+                break;
+        }
+    }
+
+    char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
+
+    // clean up our messages
+    for (i = 0; i < num_msgs; i++)
+        osrfMessageFree(msg_list[i]);
+
+    return finalMsg;
+}
+
+// All REQUESTs are logged as activity.
+static void log_request(const char* service, osrfMessage* msg) {
+
+    const jsonObject* params = msg->_params;
+    growing_buffer* act = buffer_init(128);
+    char* method = msg->method_name;
+    const jsonObject* obj = NULL;
+    int i = 0;
+    const char* str;
+    int redactParams = 0;
+
+    buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
+
+    while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+        if (!strncmp(method, str, strlen(str))) {
+            redactParams = 1;
+            break;
+        }
+    }
+
+    if (redactParams) {
+        OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+    } else {
+        i = 0;
+        while ((obj = jsonObjectGetIndex(params, i++))) {
+            char* str = jsonObjectToJSON(obj);
+            if (i == 1)
+                OSRF_BUFFER_ADD(act, " ");
+            else
+                OSRF_BUFFER_ADD(act, ", ");
+            OSRF_BUFFER_ADD(act, str);
+            free(str);
+        }
+    }
+
+    osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+    buffer_free(act);
+}
+
+
+
+// Relay response messages from OpenSRF to STDIN
+// Relays all available messages
+static void read_from_osrf() {
+    transport_message* tmsg = NULL;
+
+    // Double check the socket connection before continuing.
+    if (!client_connected(osrf_handle) ||
+        !socket_connected(osrf_handle->session->sock_id)) {
+        osrfLogWarning(OSRF_LOG_MARK,
+            "WS: Jabber socket disconnected, exiting");
+        shut_it_down(1);
+    }
+
+    // Once client_recv is called all data waiting on the socket is
+    // read.  This means we can't return to the main select() loop after
+    // each message, because any subsequent messages will get stuck in
+    // the opensrf receive queue. Process all available messages.
+    while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+        read_one_osrf_message(tmsg);
+        message_free(tmsg);
+    }
+}
+
+// Process a single OpenSRF response message and print the reponse
+// to STDOUT for delivery to the websocket client.
+static void read_one_osrf_message(transport_message* tmsg) {
+    osrfList *msg_list = NULL;
+    osrfMessage *one_msg = NULL;
+    int i;
+
+    osrfLogDebug(OSRF_LOG_MARK,
+        "WS received opensrf response for thread=%s", tmsg->thread);
+
+    // first we need to perform some maintenance
+    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+    for (i = 0; i < msg_list->size; i++) {
+        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+        osrfLogDebug(OSRF_LOG_MARK,
+            "WS returned response of type %d", one_msg->m_type);
+
+        /*  if our client just successfully connected to an opensrf service,
+            cache the sender so that future calls on this thread will use
+            the correct recipient. */
+        if (one_msg && one_msg->m_type == STATUS) {
+
+            if (one_msg->status_code == OSRF_STATUS_OK) {
+
+                if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
+
+                    unsigned long ses_size =
+                        osrfHashGetCount(stateful_session_cache);
+
+                    if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
+
+                        osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
+                            "thread=%s, sender=%s; concurrent=%d",
+                            tmsg->thread, tmsg->sender, ses_size);
+
+                        char* sender = strdup(tmsg->sender); // free in *Remove
+                        osrfHashSet(stateful_session_cache, sender, tmsg->thread);
+
+                    } else {
+
+                        osrfLogWarning(OSRF_LOG_MARK,
+                            "WS max concurrent sessions (%d) reached.  "
+                            "Current session will not be tracked",
+                            MAX_ACTIVE_STATEFUL_SESSIONS
+                        );
+                    }
+                }
+
+            } else {
+
+                // connection timed out; clear the cached recipient
+                if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+                    osrfHashRemove(stateful_session_cache, tmsg->thread);
+                }
+            }
+        }
+    }
+
+    // osrfMessageDeserialize applies the freeItem handler to the
+    // newly created osrfList.  We only need to free the list and
+    // the individual osrfMessage's will be freed along with it
+    osrfListFree(msg_list);
+
+    // Pack the response into a websocket wrapper message.
+    jsonObject *msg_wrapper = NULL;
+    char *msg_string = NULL;
+    msg_wrapper = jsonNewObject(NULL);
+
+    jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+    jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+    jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+    if (tmsg->is_error) {
+        // tmsg->sender is the original recipient. they get swapped
+        // in error replies.
+        osrfLogError(OSRF_LOG_MARK,
+            "WS received XMPP error message in response to thread=%s and "
+            "recipient=%s.  Likely the recipient is not accessible/available.",
+            tmsg->thread, tmsg->sender);
+        jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+    }
+
+    msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+    // Send the JSON to STDOUT
+    printf("%s\n", msg_string);
+
+    free(msg_string);
+    jsonObjectFree(msg_wrapper);
+}
+
+
diff --git a/src/websocket-stdio/test-stateful.pl b/src/websocket-stdio/test-stateful.pl
new file mode 100755 (executable)
index 0000000..0487d5e
--- /dev/null
@@ -0,0 +1,148 @@
+#!/usr/bin/perl
+# --------------------------------------------------------------------------
+# Copyright (C) 2018 King County Library Service
+# Bill Erickson <berickxx@gmail.com>
+# 
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# --------------------------------------------------------------------------
+#
+# Synopsis:
+#
+# $ sudo cpan Net::Async::WebSocket::Client; 
+# $ sudo cpan IO::Async::SSL;
+# $ time perl test-stateful.pl wss://localhost:443/osrf-websocket-translator
+#
+# --------------------------------------------------------------------------
+use strict;
+use warnings;
+use IO::Async::Loop;
+use Net::Async::WebSocket::Client;
+use OpenSRF::Utils::JSON;
+
+my $fork_count = 5;
+my $batch_size = 1000;
+
+# allow the script to run easily on test VMs.
+use IO::Socket::SSL;
+IO::Socket::SSL::set_ctx_defaults(SSL_verify_mode => 0);
+
+package StatefulBatch;
+
+sub new {
+    my $class = shift;
+    my $self = {
+        client => undef,
+        loop => undef,
+        thread => undef,
+        sent_count => 0,
+        in_connect => 0
+    };
+
+    return bless($self, $class);
+}
+
+sub send_connect {
+    my $self = shift;
+    $self->{in_connect} = 1;
+
+    my $thread = $self->{thread} = rand(); # reset on connect
+    my $msg = <<MSG;
+    {"service":"open-ils.auth","thread":"$thread","osrf_msg":[{"__c":"osrfMessage","__p":{"threadTrace":"0","locale":"en-US","tz":"America/New_York","type":"CONNECT"}}]}
+MSG
+
+    $self->{client}->send_text_frame($msg);
+}
+
+sub send_request {
+    my $self = shift;
+    $self->{in_connect} = 0;
+
+    my $thread = $self->{thread};
+    my $msg = <<MSG;
+{"service":"open-ils.auth","thread":"$thread","osrf_msg":[{"__c":"osrfMessage","__p":{"threadTrace":1,"type":"REQUEST","payload":{"__c":"osrfMethod","__p":{"method":"opensrf.system.echo","params":["EC asldi asldif asldfia sdflias dflasdif alsdif asldfias dlfiasd flasidf alsdif alsdif asldfia sldfias dlfias dflaisd flasidf lasidf alsdif asldif asldif asldif asldif asldif asldfia sldfia sdlfias dlfias dfliasd flasidf lasidf alsdif asldif alsdif asldif asldif aslidf alsdif alsidf alsdif asldif asldif asldif asldif asldif asldif alsdif alsdif alsidf alsidf alsdif asldif asldif asldfi asldfi asldif asldif asldfi asldfias ldfaisdf lasidf alsdif asldif asldfi asdlfias dHO ME"]}},"locale":"en-US","tz":"America/New_York","api_level":1}}]}
+MSG
+
+    $self->{client}->send_text_frame($msg);
+}
+
+sub send_disconnect {
+    my $self = shift;
+
+    my $thread = $self->{thread};
+    my $msg = <<MSG;
+    {"service":"open-ils.auth","thread":"$thread","osrf_msg":[{"__c":"osrfMessage","__p":{"threadTrace":"2","locale":"en-US","tz":"America/New_York","type":"DISCONNECT"}}]}
+MSG
+    $self->{client}->send_text_frame($msg);
+}
+
+
+sub on_message {
+    my ($self, $frame) = @_;
+
+    my $msg = OpenSRF::Utils::JSON->JSON2perl($frame);
+    my $type = $msg->{osrf_msg}->[0]->{type};
+
+    if ($self->{in_connect}) {
+        my $msg = OpenSRF::Utils::JSON->JSON2perl($frame);
+        if ($type ne 'STATUS') {
+            die "Received unexpected message type: $type : $frame\n";
+        }
+        $self->send_request;
+
+    } else {
+
+        if ($type ne 'RESULT') {
+            die "Received unexpected message type: $type : $frame\n";
+        }
+
+        # disconnect messages do not return replies
+        $self->send_disconnect;
+
+        print "[$$] completed ".$self->{sent_count} . " of $batch_size\n";
+
+        if ($self->{sent_count}++ >= $batch_size) {
+            $self->{loop}->stop;
+            return;
+        }
+
+        $self->send_connect;
+    }
+}
+
+package main;
+
+my $url = $ARGV[0] or die "WS URL REQUIRED\n";
+
+
+for (1..$fork_count) {
+
+    if (fork() == 0) {
+        my $tester = StatefulBatch->new;
+
+        $tester->{client} = Net::Async::WebSocket::Client->new(
+            on_text_frame => sub {
+                my ($client, $frame) = @_;
+                $tester->on_message($frame);
+            }
+        );
+
+        $tester->{loop} = IO::Async::Loop->new;
+        $tester->{loop}->add($tester->{client});
+        $tester->{client}->connect(
+            url => $url, on_connected => sub {$tester->send_connect});
+        $tester->{loop}->run;
+        exit(0);
+    }
+}
+
+# exit after all children have exited
+while (wait() > -1) {}
+
diff --git a/src/websocket-stdio/tester.pl b/src/websocket-stdio/tester.pl
new file mode 100755 (executable)
index 0000000..75db284
--- /dev/null
@@ -0,0 +1,87 @@
+#!/usr/bin/perl
+# --------------------------------------------------------------------------
+# Copyright (C) 2018 King County Library Service
+# Bill Erickson <berickxx@gmail.com>
+# 
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# --------------------------------------------------------------------------
+#
+# Synopsis:
+#
+# $ sudo cpan Net::Async::WebSocket::Client; 
+# $ sudo cpan IO::Async::SSL;
+# $ time perl tester.pl wss://localhost:443/osrf-websocket-translator
+#
+# --------------------------------------------------------------------------
+use strict;
+use warnings;
+use IO::Async::Loop;
+use Net::Async::WebSocket::Client;
+
+# allow the script to run easily on test VMs.
+use IO::Socket::SSL;
+IO::Socket::SSL::set_ctx_defaults(SSL_verify_mode => 0);
+
+my $client;
+my $loop;
+my $send_batches = 1000;
+my $batches_sent = 0;
+my $send_wanted = 5; # per batch
+my $send_count = 0;
+my $recv_count = 0;
+
+sub send_one_msg {
+       my $thread = rand();
+    $send_count++;
+
+       my $osrf_msg = <<MSG;
+{"service":"open-ils.auth","thread":"$thread","osrf_msg":[{"__c":"osrfMessage","__p":{"threadTrace":0,"type":"REQUEST","payload":{"__c":"osrfMethod","__p":{"method":"opensrf.system.echo","params":["EC asldi asldif asldfia sdflias dflasdif alsdif asldfias dlfiasd flasidf alsdif alsdif asldfia sldfias dlfias dflaisd flasidf lasidf alsdif asldif asldif asldif asldif asldif asldfia sldfia sdlfias dlfias dfliasd flasidf lasidf alsdif asldif alsdif asldif asldif aslidf alsdif alsidf alsdif asldif asldif asldif asldif asldif asldif alsdif alsdif alsidf alsidf alsdif asldif asldif asldfi asldfi asldif asldif asldfi asldfias ldfaisdf lasidf alsdif asldif asldfi asdlfias dHO ME"]}},"locale":"en-US","tz":"America/New_York","api_level":1}}]}
+MSG
+
+       $client->send_text_frame($osrf_msg);
+    print "batch=$batches_sent sent=$send_count received=$recv_count\n";
+}
+
+my $on_message = sub {
+    my ($self, $frame) = @_;
+    $recv_count++;
+    print "batch=$batches_sent sent=$send_count received=$recv_count\n";
+
+    if ($send_count == $send_wanted && $send_count == $recv_count) {
+        # once every request in the current batch has received
+        # a reply, kick off a new batch.
+        send_next_batch();
+    }
+};
+
+sub send_next_batch {
+
+    if ($batches_sent == $send_batches) {
+        $loop->stop;
+        return;
+    }
+
+    $batches_sent++;
+    $send_count = 0;
+    $recv_count = 0;
+    for (1..$send_wanted) {
+        send_one_msg();
+    }
+}
+
+my $url = $ARGV[0] or die "WS URL REQUIRED\n";
+
+$client = Net::Async::WebSocket::Client->new(on_text_frame => $on_message);
+$loop = IO::Async::Loop->new;
+$loop->add($client);
+$client->connect(url => $url, on_connected => sub {send_next_batch()});
+$loop->run;
+