*/
#include "httpd.h"
-#include "http_log.h"
-#include "http_log.h"
#include "apr_strings.h"
#include "apr_thread_proc.h"
#include "apr_hash.h"
#include "websocket_plugin.h"
+#include "opensrf/log.h"
#include "opensrf/osrf_json.h"
#include "opensrf/transport_client.h"
#include "opensrf/transport_message.h"
#define MAX_THREAD_SIZE 64
#define RECIP_BUF_SIZE 128
-static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-static transport_client *osrf_handle = NULL;
typedef struct _osrfWebsocketTranslator {
const WebSocketServer *server;
- apr_pool_t *main_pool; // standline per-process pool
+ apr_pool_t *main_pool; // standalone per-process pool
apr_pool_t *session_pool; // child of trans->main_pool; per-session
apr_hash_t *session_cache;
apr_thread_t *responder_thread;
} osrfWebsocketTranslator;
static osrfWebsocketTranslator *trans = NULL;
+static transport_client *osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
/**
*/
void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
- request_rec *r = trans->server->request(trans->server);
+ transport_message *tmsg;
jsonObject *msg_wrapper;
char *msg_string;
while (1) {
- transport_message *msg = client_recv(osrf_handle, -1);
- if (!msg) continue; // early exit on interrupt
+ tmsg = client_recv(osrf_handle, -1);
+
+ if (!tmsg) continue; // early exit on interrupt
// discard responses received after client disconnect
if (!trans->client_connected) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ osrfLogDebug(OSRF_LOG_MARK,
"WS discarding response for thread=%s, xid=%s",
- msg->thread, msg->osrf_xid);
- message_free(msg);
+ tmsg->thread, tmsg->osrf_xid);
+ message_free(tmsg);
continue;
}
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+
+ osrfLogDebug(OSRF_LOG_MARK,
"WS received opensrf response for thread=%s, xid=%s",
- msg->thread, msg->osrf_xid);
+ tmsg->thread, tmsg->osrf_xid);
// build the wrapper object
msg_wrapper = jsonNewObject(NULL);
- jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
- jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
- jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
+ jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+ jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+ jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
- if (msg->is_error) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ if (tmsg->is_error) {
+ fprintf(stderr,
"WS received jabber error message in response to thread=%s and xid=%s",
- msg->thread, msg->osrf_xid);
+ tmsg->thread, tmsg->osrf_xid);
+ fflush(stderr);
jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
}
// capture the true message sender
// TODO: this will grow to add one entry per client session.
- // need a last-touched timeout mechanism to periodically remove old entries
- if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
+ // need to ensure that connected-sessions don't last /too/ long or create
+ // a last-touched timeout mechanism to periodically remove old entries
+ if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, msg->thread),
+ apr_pstrdup(trans->session_pool, tmsg->thread),
APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, msg->sender));
+ apr_pstrdup(trans->session_pool, tmsg->sender));
}
free(msg_string);
jsonObjectFree(msg_wrapper);
- message_free(msg);
+ message_free(tmsg);
}
return NULL;
apr_threadattr_t *thread_attr = NULL;
request_rec *r = server->request(server);
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS child_init");
+ osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
// osrf_handle will already be connected if this is not the first request
// served by this process.
char* config_file = "/openils/conf/opensrf_core.xml";
char* config_ctx = "gateway"; //TODO config
if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ osrfLogError(OSRF_LOG_MARK,
"WS unable to bootstrap OpenSRF client with config %s", config_file);
return 1;
}
// create a standalone pool for our translator data
if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create apr_pool");
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
return 1;
}
apr_palloc(pool, sizeof(osrfWebsocketTranslator));
if (trans == NULL) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS Unable to create translator");
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
return 1;
}
trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
- // Create the responder thread. Once created, it runs for the lifetime
- // of this process.
+ // Create the responder thread. Once created,
+ // it runs for the lifetime of this process.
if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
(apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
(apr_thread_create(&thread, thread_attr,
trans->responder_thread = thread;
} else {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS unable to create responder thread");
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
return 1;
}
request_rec *r = server->request(server);
apr_pool_t *pool;
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS connect from %s", r->connection->remote_ip);
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS connect from %s", r->connection->remote_ip);
+ //"WS connect from %s", r->connection->client_ip); // apache 2.4
if (!trans) {
if (child_init(server) != APR_SUCCESS) {
// create a standalone pool for the session cache values, which will be
// destroyed on client disconnect.
if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS Unable to create apr_pool");
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
return NULL;
}
trans->session_pool = pool;
trans->session_cache = apr_hash_make(trans->session_pool);
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS created new pool %x", trans->session_pool);
-
if (trans->session_cache == NULL) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS unable to create session cache");
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
return NULL;
}
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS created new hash %x", trans->session_cache);
-
trans->client_connected = 1;
return trans;
}
if (buffer_size <= 0) return OK;
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS received message size=%d", buffer_size);
+ osrfLogDebug(OSRF_LOG_MARK, "WS received message size=%d", buffer_size);
// buffer may not be \0-terminated, which jsonParse requires
char buf[buffer_size + 1];
msg_wrapper = jsonParseRaw(buf);
if (msg_wrapper == NULL) {
- ap_log_rerror(APLOG_MARK,
- APLOG_NOTICE, 0, r, "WS Invalid JSON: %s", buf);
+ osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
return HTTP_BAD_REQUEST;
}
log_xid = jsonObjectGetString(tmp_obj);
if (log_xid) {
+
// use the caller-provide log trace id
if (strlen(log_xid) > MAX_THREAD_SIZE) {
- ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
- 0, r, "WS log_xid exceeds max length");
+ osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
return HTTP_BAD_REQUEST;
}
- osrfLogSetXid(log_xid); // TODO: make with with non-client
+
+ // TODO: make this work with non-client and make this call accept
+ // const char*'s. casting to (char*) for now to silence warnings.
+ osrfLogSetXid((char*) log_xid);
+
} else {
// generate a new log trace id for this relay
osrfLogMkXid();
if (thread) {
if (strlen(thread) > MAX_THREAD_SIZE) {
- ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
- 0, r, "WS thread exceeds max length");
+ osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
return HTTP_BAD_REQUEST;
}
trans->session_cache, thread, APR_HASH_KEY_STRING);
if (recipient) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS found cached recipient %s", recipient);
+ osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
}
}
recipient = recipient_buf;
} else {
- ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
- 0, r, "WS Unable to determine recipient");
+ osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
return HTTP_BAD_REQUEST;
}
}
// TODO: activity log entry? -- requires message analysis
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ osrfLogDebug(OSRF_LOG_MARK,
"WS relaying message thread=%s, xid=%s, recipient=%s",
thread, osrfLogGetXid(), recipient);
osrfLogClearXid();
message_free(tmsg);
- free(msg_wrapper);
+ jsonObjectFree(msg_wrapper);
free(msg_body);
return OK;
trans->session_cache = NULL;
request_rec *r = server->request(server);
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS disconnect from %s", r->connection->remote_ip);
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS disconnect from %s", r->connection->remote_ip);
+ //"WS disconnect from %s", r->connection->client_ip); // apache 2.4
}
+/**
+ * Be nice and clean up our mess
+ */
void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
- fprintf(stderr, "WS on_destroy_handler()\n");
- fflush(stderr);
-
if (trans) {
apr_thread_exit(trans->responder_thread, APR_SUCCESS);
apr_pool_destroy(trans->main_pool);