1 /* -----------------------------------------------------------------------
2 * Copyright 2012 Equinox Software, Inc.
3 * Bill Erickson <berick@esilibrary.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * -----------------------------------------------------------------------
18 * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
19 * and relayed to the opensrf network. Responses are pulled from the opensrf
20 * network and passed back to the client. Messages are analyzed to determine
21 * when a connect/disconnect occurs, so that the cache of recipients can be
22 * properly managed. We also activity-log REQUEST messages.
24 * Messages to/from the websocket client take the following form:
26 * "service" : "opensrf.foo", // required
27 * "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
28 * "log_xid" : "123..32", // optional log trace ID, max 64 chars;
29 * "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...] // required
32 * Each translator operates with three threads. One thread receives messages
33 * from the websocket client, translates, and relays them to the opensrf
34 * network. The second thread collects responses from the opensrf network and
35 * relays them back to the websocket client. The third thread inspects
36 * the idle timeout interval t see if it's time to drop the idle client.
38 * After the initial setup, all thread actions occur within a thread
39 * mutex. The desired affect is a non-threaded application that uses
40 * threads for the sole purpose of having one thread listening for
41 * incoming data, while a second thread listens for responses, and a
42 * third checks the idle timeout. When any thread awakens, it's the
43 * only thread in town until it goes back to sleep (i.e. listening on
44 * its socket for data).
46 * Note that with the opensrf "thread", which allows us to identify the
47 * opensrf session, the caller does not need to provide a recipient
48 * address. The "service" is only required to start a new opensrf
49 * session. After the sesession is started, all future communication is
50 * based solely on the thread. However, the "service" should be passed
51 * by the caller for all requests to ensure it is properly logged in the
54 * Every inbound and outbound message updates the last_activity_time.
55 * A separate thread wakes periodically to see if the time since the
56 * last_activity_time exceeds the configured idle_timeout_interval. If
57 * so, a disconnect is sent to the client, completing the conversation.
59 * Configuration goes directly into the Apache envvars file.
60 * (e.g. /etc/apache2-websockets/envvars). As of today, it's not
61 * possible to leverage Apache configuration directives directly,
62 * since this is not an Apache module, but a shared library loaded
63 * by an apache module. This includes SetEnv / SetEnvIf.
65 * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
66 * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
67 * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
68 * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
76 #include "apr_strings.h"
77 #include "apr_thread_proc.h"
79 #include "websocket_plugin.h"
80 #include "opensrf/log.h"
81 #include "opensrf/osrf_json.h"
82 #include "opensrf/transport_client.h"
83 #include "opensrf/transport_message.h"
84 #include "opensrf/osrf_system.h"
85 #include "opensrf/osrfConfig.h"
87 #define MAX_THREAD_SIZE 64
88 #define RECIP_BUF_SIZE 256
89 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
91 // maximun number of active, CONNECTed opensrf sessions allowed. in
92 // practice, this number will be very small, rarely reaching double
93 // digits. This is just a security back-stop. A client trying to open
94 // this many connections is almost certainly attempting to DOS the
95 // gateway / server. We may want to lower this further.
96 #define MAX_ACTIVE_STATEFUL_SESSIONS 128
98 // default values, replaced during setup (below) as needed.
99 static char* config_file = "/openils/conf/opensrf_core.xml";
100 static char* config_ctxt = "gateway";
102 static time_t idle_timeout_interval = 120;
103 static time_t idle_check_interval = 5;
104 static time_t last_activity_time = 0;
106 // Generally, we do not disconnect the client (as idle) if there is a
107 // request in flight. However, we need to have an upper bound on the
108 // amount of time we will wait for in-flight requests to complete to
109 // avoid leaving an effectively idle connection open after a request
110 // died on the backend and no response was received.
111 // Note that if other activity occurs while a long-running request
112 // is active, the wait time will get reset with each new activity.
113 // This is OK, though, because the goal of max_request_wait_time
114 // is not to chop requests off at the knees, it's to allow the client
115 // to timeout as idle when only a single long-running request is active
116 // and preventing timeout.
117 static time_t max_request_wait_time = 600;
119 // Incremented with every REQUEST, decremented with every COMPLETE.
120 // Gives us a rough picture of the number of reqests we've sent to
121 // the server vs. the number for which a completed response has been
123 static int requests_in_flight = 0;
125 // true if we've received a signal to start graceful shutdown
126 static int shutdown_requested = 0;
127 static void sigusr1_handler(int sig);
128 static void sigusr1_handler(int sig) {
129 shutdown_requested = 1;
130 signal(SIGUSR1, sigusr1_handler);
131 osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
134 static const char* get_client_ip(const request_rec* r) {
136 return r->connection->client_ip;
138 return r->connection->remote_ip;
142 typedef struct _osrfWebsocketTranslator {
144 /** Our handle for communicating with the caller */
145 const WebSocketServer *server;
148 * Standalone, per-process APR pool. Primarily
149 * there for managing thread data, which lasts
150 * the duration of the process.
152 apr_pool_t *main_pool;
155 * Map of thread => drone-xmpp-address. Maintaining this
156 * map internally means the caller never need know about
157 * internal XMPP addresses and the server doesn't have to
158 * verify caller-specified recipient addresses. It's
159 * all managed internally. This is only used for stateful
160 * (CONNECT'ed) session. Stateless sessions need not
161 * track the recipient, since they are one-off calls.
163 apr_hash_t *stateful_session_cache;
166 * stateful_session_pool contains the key/value pairs stored in
167 * the stateful_session_cache. The pool is regularly destroyed
168 * and re-created to avoid long-term memory consumption
170 apr_pool_t *stateful_session_pool;
173 * Thread responsible for collecting responses on the opensrf
174 * network and relaying them back to the caller
176 apr_thread_t *responder_thread;
179 * Thread responsible for checking inactivity timeout.
180 * If no activitity occurs within the configured interval,
181 * a disconnect is sent to the client and the connection
184 apr_thread_t *idle_timeout_thread;
187 * All message handling code is wrapped in a thread mutex such
188 * that all actions (after the initial setup) are serialized
189 * to minimize the possibility of multi-threading snafus.
191 apr_thread_mutex_t *mutex;
194 * True if a websocket client is currently connected
196 int client_connected;
198 /** OpenSRF jouter name */
201 /** OpenSRF domain */
204 } osrfWebsocketTranslator;
206 static osrfWebsocketTranslator *trans = NULL;
207 static transport_client *osrf_handle = NULL;
208 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
210 static void clear_cached_recipient(const char* thread) {
211 apr_pool_t *pool = NULL;
212 request_rec *r = trans->server->request(trans->server);
214 if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
216 osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
218 // remove it from the hash
219 apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL);
221 if (apr_hash_count(trans->stateful_session_cache) == 0) {
222 osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool");
224 // memory accumulates in the stateful_session_pool as
225 // sessions are cached then un-cached. Un-caching removes
226 // strings from the hash, but not from the pool. Clear the
227 // pool here. note: apr_pool_clear does not free memory, it
228 // reclaims it for use again within the pool. This is more
229 // effecient than freeing and allocating every time.
230 apr_pool_clear(trans->stateful_session_pool);
235 void* osrf_responder_thread_main_body(transport_message *tmsg) {
237 osrfList *msg_list = NULL;
238 osrfMessage *one_msg = NULL;
241 osrfLogDebug(OSRF_LOG_MARK,
242 "WS received opensrf response for thread=%s", tmsg->thread);
244 // first we need to perform some maintenance
245 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
247 for (i = 0; i < msg_list->size; i++) {
248 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
250 osrfLogDebug(OSRF_LOG_MARK,
251 "WS returned response of type %d", one_msg->m_type);
253 /* if our client just successfully connected to an opensrf service,
254 cache the sender so that future calls on this thread will use
255 the correct recipient. */
256 if (one_msg && one_msg->m_type == STATUS) {
258 if (one_msg->status_code == OSRF_STATUS_OK) {
260 if (!apr_hash_get(trans->stateful_session_cache,
261 tmsg->thread, APR_HASH_KEY_STRING)) {
263 apr_size_t ses_size =
264 apr_hash_count(trans->stateful_session_cache);
266 if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
268 osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
269 "thread=%s, sender=%s; concurrent=%d",
270 tmsg->thread, tmsg->sender, ses_size);
272 apr_hash_set(trans->stateful_session_cache,
273 apr_pstrdup(trans->stateful_session_pool, tmsg->thread),
275 apr_pstrdup(trans->stateful_session_pool, tmsg->sender));
278 osrfLogWarning(OSRF_LOG_MARK,
279 "WS max concurrent sessions (%d) reached. "
280 "Current session will not be tracked",
281 MAX_ACTIVE_STATEFUL_SESSIONS
288 // connection timed out; clear the cached recipient
289 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
290 clear_cached_recipient(tmsg->thread);
293 if (one_msg->status_code == OSRF_STATUS_COMPLETE)
294 requests_in_flight--;
300 // osrfMessageDeserialize applies the freeItem handler to the
301 // newly created osrfList. We only need to free the list and
302 // the individual osrfMessage's will be freed along with it
303 osrfListFree(msg_list);
305 // relay the response messages to the client
306 jsonObject *msg_wrapper = NULL;
307 char *msg_string = NULL;
309 // build the wrapper object
310 msg_wrapper = jsonNewObject(NULL);
311 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
312 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
313 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
315 if (tmsg->is_error) {
316 osrfLogError(OSRF_LOG_MARK,
317 "WS received jabber error message in response to thread=%s",
319 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
322 msg_string = jsonObjectToJSONRaw(msg_wrapper);
324 // drop the JSON on the outbound wire
325 trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
326 (unsigned char*) msg_string, strlen(msg_string));
329 jsonObjectFree(msg_wrapper);
333 * Responder thread main body.
334 * Collects responses from the opensrf network and relays them to the
337 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
339 transport_message *tmsg;
342 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
343 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
347 // wait indefinitely for a response
348 tmsg = client_recv(osrf_handle, -1);
351 // tmsg can only be NULL if the underlying select() call is
352 // interrupted or the jabber socket connection was severed.
354 if (client_connected(osrf_handle) &&
355 socket_connected(osrf_handle->session->sock_id)) {
356 continue; // interrupted. restart loop.
359 // Socket connection was broken. Send disconnect to client,
360 // causing on_disconnect_handler to run and cleanup.
361 osrfLogWarning(OSRF_LOG_MARK,
362 "WS: Jabber socket disconnected. Sending close() to client");
364 trans->server->close(trans->server);
365 return NULL; // exit thread
368 if (trans->client_connected) {
370 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
371 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
375 osrfLogForceXid(tmsg->osrf_xid);
376 osrf_responder_thread_main_body(tmsg);
377 last_activity_time = time(NULL);
386 static int active_connection_count() {
388 if (requests_in_flight) {
390 time_t now = time(NULL);
391 time_t difference = now - last_activity_time;
393 if (difference >= max_request_wait_time) {
394 osrfLogWarning(OSRF_LOG_MARK,
395 "%d In-flight request(s) took longer than %d seconds "
396 "to complete. Treating request as dead and moving on.",
398 max_request_wait_time
400 requests_in_flight = 0;
404 return requests_in_flight;
408 * Sleep and regularly wake to see if the process has been idle for too
409 * long. If so, send a disconnect to the client.
411 void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
412 apr_thread_t *thread, void *data) {
414 // sleep time defaults to the check interval, but may
415 // be shortened during shutdown.
416 int sleep_time = idle_check_interval;
417 int shutdown_loops = 0;
421 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
422 osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
426 // note: receiving a signal (e.g. SIGUSR1) will not interrupt
427 // this sleep(), since it's running within its own thread.
428 // During graceful shtudown, we may wait up to
429 // idle_check_interval seconds before initiating shutdown.
432 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
433 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
437 // no client is connected. reset sleep time go back to sleep.
438 if (!trans->client_connected) {
439 sleep_time = idle_check_interval;
443 // do we have any active stateful conversations with the client?
444 int active_count = active_connection_count();
448 if (shutdown_requested) {
449 // active conversations means we can't shut down.
450 // shorten the check interval to re-check more often.
452 osrfLogDebug(OSRF_LOG_MARK,
453 "WS: %d active conversation(s) found in shutdown after "
454 "%d attempts. Sleeping...", shutdown_loops, active_count
457 if (shutdown_loops > 30) {
458 // this is clearly a long-running conversation, let's
459 // check less frequently to avoid excessive logging.
466 // active conversations means keep going. There's no point in
467 // checking the idle time (below) if we're mid-conversation
471 // no active conversations
473 if (shutdown_requested) {
474 // there's no need to reset the shutdown vars (loops/requested)
475 // SIGUSR1 is Apaches reload signal, which means this process
476 // will be going away as soon as the client is disconnected.
478 osrfLogInfo(OSRF_LOG_MARK,
479 "WS: no active conversations remain in shutdown; "
480 "closing client connection");
483 // see how long we've been idle. If too long, kick the client
485 time_t now = time(NULL);
486 time_t difference = now - last_activity_time;
488 osrfLogDebug(OSRF_LOG_MARK,
489 "WS connection idle for %d seconds", difference);
491 if (difference < idle_timeout_interval) {
492 // Last activity occurred within the idle timeout interval.
496 // idle timeout exceeded
497 osrfLogDebug(OSRF_LOG_MARK,
498 "WS: idle timeout exceeded. now=%d / last=%d; "
499 "closing client connection", now, last_activity_time);
503 // send a disconnect to the client, which will come back around
504 // to cause our on_disconnect_handler to run.
505 osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
506 trans->server->close(trans->server);
508 // client will be going away, reset sleep time
509 sleep_time = idle_check_interval;
512 // should never get here
516 static int build_startup_data(const WebSocketServer *server) {
518 apr_pool_t *main_pool = NULL;
519 apr_pool_t *stateful_session_pool = NULL;
520 apr_thread_t *thread = NULL;
521 apr_threadattr_t *thread_attr = NULL;
522 apr_thread_mutex_t *mutex = NULL;
523 request_rec *r = server->request(server);
525 // create a pool for our translator data
526 // Do not use r->pool as the parent, since r->pool will be freed
527 // when the current client disconnects.
528 if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) {
529 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
533 trans = (osrfWebsocketTranslator*)
534 apr_palloc(main_pool, sizeof(osrfWebsocketTranslator));
537 osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
541 trans->server = server;
542 trans->main_pool = main_pool;
543 trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
544 trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
546 // opensrf session / recipient cache
547 trans->stateful_session_cache = apr_hash_make(trans->main_pool);
548 if (trans->stateful_session_cache == NULL) {
549 osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
553 // opensrf session / recipient string pool; cleared regularly
554 // the only data entering this pools are the session strings.
555 if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) {
556 osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
559 trans->stateful_session_pool = stateful_session_pool;
561 if (apr_thread_mutex_create(
562 &mutex, APR_THREAD_MUTEX_UNNESTED,
563 trans->main_pool) != APR_SUCCESS) {
564 osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
567 trans->mutex = mutex;
570 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
571 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
572 (apr_thread_create(&thread, thread_attr,
573 osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
575 trans->responder_thread = thread;
578 osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
582 // idle timeout thread
583 thread = NULL; // reset
584 thread_attr = NULL; // reset
585 if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
586 (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
587 (apr_thread_create(&thread, thread_attr,
588 osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
590 osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
591 trans->idle_timeout_thread = thread;
594 osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
603 * Connect to OpenSRF, create the main pool, responder thread
604 * session cache and session pool.
606 int child_init(const WebSocketServer *server) {
607 request_rec *r = server->request(server);
609 // osrf_handle will already be connected if this is not the first request
610 // served by this process.
611 if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
613 // load config values from the env
614 char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
616 if (!atoi(timeout)) {
617 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
618 "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
620 idle_timeout_interval = (time_t) atoi(timeout);
624 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
625 "WS: timeout set to %d", idle_timeout_interval);
627 timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME");
629 if (!atoi(timeout)) {
630 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
631 "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s",
635 max_request_wait_time = (time_t) atoi(timeout);
639 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
640 "WS: max request wait time set to %d", max_request_wait_time);
642 char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
644 if (!atoi(interval)) {
645 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
646 "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s",
650 idle_check_interval = (time_t) atoi(interval);
654 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
655 "WS: idle check interval set to %d", idle_check_interval);
658 char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
660 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
661 "WS: config file set to %s", cfile);
665 char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
667 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
668 "WS: config context set to %s", ctxt);
672 // connect to opensrf
673 if (!osrfSystemBootstrapClientResc(
674 config_file, config_ctxt, "websocket")) {
676 osrfLogError(OSRF_LOG_MARK,
677 "WS unable to bootstrap OpenSRF client with config %s "
678 "and context %s", config_file, config_ctxt
683 osrfLogSetAppname("osrf_websocket_translator");
684 osrf_handle = osrfSystemGetTransportClient();
687 signal(SIGUSR1, sigusr1_handler);
692 * Create the per-client translator
694 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
695 request_rec *r = server->request(server);
697 if (!trans) { // first connection
699 // connect to opensrf
700 if (child_init(server) != APR_SUCCESS)
703 // build pools, thread data, and the translator
704 if (build_startup_data(server) != APR_SUCCESS)
708 const char* client_ip = get_client_ip(r);
709 osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
711 last_activity_time = time(NULL);
712 trans->client_connected = 1;
718 * for each inbound opensrf message:
719 * 1. Stamp the ingress
720 * 2. REQUEST: log it as activity
721 * 3. DISCONNECT: remove the cached recipient
722 * then re-string-ify for xmpp delivery
725 static char* extract_inbound_messages(
726 const request_rec *r,
729 const char* recipient,
730 const jsonObject *osrf_msg) {
733 int num_msgs = osrf_msg->size;
735 osrfMessage* msg_list[num_msgs];
737 // here we do an extra json round-trip to get the data
738 // in a form osrf_message_deserialize can understand
739 // TODO: consider a version of osrf_message_init which can
740 // accept a jsonObject* instead of a JSON string.
741 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
742 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
745 // should we require the caller to always pass the service?
746 if (service == NULL) service = "";
748 for(i = 0; i < num_msgs; i++) {
750 osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
752 switch(msg->m_type) {
755 const jsonObject* params = msg->_params;
756 growing_buffer* act = buffer_init(128);
757 char* method = msg->method_name;
758 buffer_fadd(act, "[%s] [%s] %s %s",
759 get_client_ip(r), "", service, method);
761 const jsonObject* obj = NULL;
764 int redactParams = 0;
765 while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
766 if(!strncmp(method, str, strlen(str))) {
772 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
775 while((obj = jsonObjectGetIndex(params, i++))) {
776 char* str = jsonObjectToJSON(obj);
778 OSRF_BUFFER_ADD(act, " ");
780 OSRF_BUFFER_ADD(act, ", ");
781 OSRF_BUFFER_ADD(act, str);
785 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
787 requests_in_flight++;
792 clear_cached_recipient(thread);
797 char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
799 // clean up our messages
800 for(i = 0; i < num_msgs; i++)
801 osrfMessageFree(msg_list[i]);
807 * Parse opensrf request and relay the request to the opensrf network.
809 static size_t on_message_handler_body(void *data,
810 const WebSocketServer *server, const int type,
811 unsigned char *buffer, const size_t buffer_size) {
813 request_rec *r = server->request(server);
815 jsonObject *msg_wrapper = NULL; // free me
816 const jsonObject *tmp_obj = NULL;
817 const jsonObject *osrf_msg = NULL;
818 const char *service = NULL;
819 const char *thread = NULL;
820 const char *log_xid = NULL;
821 char *msg_body = NULL;
822 char *recipient = NULL;
825 if (buffer_size <= 0) return OK;
827 // generate a new log trace for this request. it
828 // may be replaced by a client-provided trace below.
831 osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
833 // buffer may not be \0-terminated, which jsonParse requires
834 char buf[buffer_size + 1];
835 memcpy(buf, buffer, buffer_size);
836 buf[buffer_size] = '\0';
838 osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", buf);
840 msg_wrapper = jsonParse(buf);
842 if (msg_wrapper == NULL) {
843 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
844 return HTTP_BAD_REQUEST;
847 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
849 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service"))
850 service = jsonObjectGetString(tmp_obj);
852 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread"))
853 thread = jsonObjectGetString(tmp_obj);
855 if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid"))
856 log_xid = jsonObjectGetString(tmp_obj);
860 // use the caller-provide log trace id
861 if (strlen(log_xid) > MAX_THREAD_SIZE) {
862 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
863 return HTTP_BAD_REQUEST;
866 osrfLogForceXid(log_xid);
871 if (strlen(thread) > MAX_THREAD_SIZE) {
872 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
873 return HTTP_BAD_REQUEST;
876 // since clients can provide their own threads at session start time,
877 // the presence of a thread does not guarantee a cached recipient
878 recipient = (char*) apr_hash_get(
879 trans->stateful_session_cache, thread, APR_HASH_KEY_STRING);
882 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
889 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
890 "%s@%s/%s", trans->osrf_router, trans->osrf_domain, service);
891 recipient_buf[size] = '\0';
892 recipient = recipient_buf;
895 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
896 return HTTP_BAD_REQUEST;
900 osrfLogDebug(OSRF_LOG_MARK,
901 "WS relaying message to opensrf thread=%s, recipient=%s",
904 msg_body = extract_inbound_messages(
905 r, service, thread, recipient, osrf_msg);
907 osrfLogInternal(OSRF_LOG_MARK,
908 "WS relaying inbound message: %s", msg_body);
910 transport_message *tmsg = message_init(
911 msg_body, NULL, thread, recipient, NULL);
913 message_set_osrf_xid(tmsg, osrfLogGetXid());
914 client_send_message(osrf_handle, tmsg);
919 jsonObjectFree(msg_wrapper);
922 last_activity_time = time(NULL);
926 static size_t CALLBACK on_message_handler(void *data,
927 const WebSocketServer *server, const int type,
928 unsigned char *buffer, const size_t buffer_size) {
930 if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
931 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
932 return 1; // TODO: map to apr_status_t value?
935 apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
937 if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
938 osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
947 * Clear the session cache, release the session pool
949 void CALLBACK on_disconnect_handler(
950 void *data, const WebSocketServer *server) {
952 // if the threads wake up during disconnect, this tells
953 // them to go back to sleep.
954 trans->client_connected = 0;
956 request_rec *r = server->request(server);
957 osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r));
959 // Clear any lingering session data
960 // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free
961 // the memory, but since there is a limit to the size of the pool
962 // (max_concurrent_sessions), the memory cannot grow unbounded,
963 // so there's no need.
964 apr_hash_clear(trans->stateful_session_cache);
965 apr_pool_clear(trans->stateful_session_pool);
968 static WebSocketPlugin osrf_websocket_plugin = {
969 sizeof(WebSocketPlugin),
970 WEBSOCKET_PLUGIN_VERSION_0,
971 NULL, // on_destroy_handler
974 on_disconnect_handler
977 extern EXPORT WebSocketPlugin * CALLBACK osrf_websocket_init() {
978 return &osrf_websocket_plugin;