#include <opensrf/osrfConfig.h>
#include <opensrf/osrf_json.h>
#include <opensrf/osrf_cache.h>
+#include <opensrf/string_array.h>
#define MODULE_NAME "osrf_http_translator_module"
#define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
#define JSON_CONTENT_TYPE "text/plain"
#define MAX_MSGS_PER_PACKET 256
#define CACHE_TIME 300
+#define TRANSLATOR_INGRESS "translator-v1"
#define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
#define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
#define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
#define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
+
char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
int osrfConnected = 0;
char recipientBuf[128];
char contentTypeBuf[80];
+osrfStringArray* allowedOrigins = NULL;
+#if 0
+// Commented out to avoid compiler warning
// for development only, writes to apache error log
static void _dbg(char* s, ...) {
VA_LIST_TO_STRING(s);
fprintf(stderr, "%s\n", VA_BUF);
fflush(stderr);
}
+#endif
// Translator struct
typedef struct {
int complete;
int timeout;
int multipart;
- int connectOnly;
- int disconnectOnly;
+ int connectOnly; // there is only 1 message, a CONNECT
+ int disconnectOnly; // there is only 1 message, a DISCONNECT
+ int connecting; // there is a connect message in this batch
+ int disconnecting; // there is a connect message in this batch
int localXid;
} osrfHttpTranslator;
return NULL;
}
-/** set up the configuratoin handlers */
-static const command_rec osrf_json_gateway_cmds[] = {
+/** set up the configuration handlers */
+static const command_rec osrfHttpTranslatorCmds[] = {
AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
NULL, RSRC_CONF, "osrf translator config file"),
AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
trans->complete = 0;
trans->connectOnly = 0;
trans->disconnectOnly = 0;
+ trans->connecting = 0;
+ trans->disconnecting = 0;
+#ifdef APACHE_MIN_24
+ trans->remoteHost = apreq->useragent_ip;
+#else
trans->remoteHost = apreq->connection->remote_ip;
+#endif
trans->messages = NULL;
/* load the message body */
- osrfStringArray* params = apacheParseParms(apreq);
+ osrfStringArray* params = apacheParseParms(apreq);
trans->body = apacheGetFirstParamValue(params, "osrf-msg");
osrfStringArrayFree(params);
/* load the request headers */
- if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
- osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
+ if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
+ // force our log xid to match the caller
+ osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
trans->handle = osrfSystemGetTransportClient();
trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
- trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
if(timeout)
snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
trans->delim = md5sum(buf);
+ /* Use thread if it has been passed in; otherwise, just use the delimiter */
+ trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
+ ? apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
+ : (const char*)trans->delim;
+
return trans;
}
osrfListFree(trans->messages);
}
+#if 0
+// Commented out to avoid compiler warning
static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
_dbg("-----------------------------------");
_dbg("body = %s", trans->body);
_dbg("multipart = %d", trans->multipart);
_dbg("recipient = %s", trans->recipient);
}
+#endif
/**
* Determines the correct recipient address based on the requested
} else {
// service is specified, build a recipient address
// from the router, domain, and service
- int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
+ int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
+ domainName, trans->service);
recipientBuf[size] = '\0';
osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
trans->recipient = recipientBuf;
sessionCache = osrfCacheGetObject(trans->thread);
if(sessionCache) {
- char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
- char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
+ const char* ipAddr = jsonObjectGetString(
+ jsonObjectGetKeyConst( sessionCache, "ip" ));
+ const char* recipient = jsonObjectGetString(
+ jsonObjectGetKeyConst( sessionCache, "jid" ));
// choosing a specific recipient address requires that the recipient and
// thread be cached on the server (so drone processes cannot be hijacked)
if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
- osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s",
+ osrfLogDebug( OSRF_LOG_MARK,
+ "Found cached session from host %s and recipient %s",
trans->remoteHost, trans->recipient);
stat = 1;
- trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
+ trans->service = apr_pstrdup(
+ trans->apreq->pool, jsonObjectGetString(
+ jsonObjectGetKeyConst( sessionCache, "service" )));
} else {
osrfLogError(OSRF_LOG_MARK,
}
/**
- * Parses the request body and logs any REQUEST messages to the activity log
+ * Parses the request body, logs any REQUEST messages to the activity log,
+ * stamps the translator ingress on each message, and returns the updated
+ * messages as a JSON string.
*/
-static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
+static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
osrfMessage* msg;
osrfMessage* msgList[MAX_MSGS_PER_PACKET];
int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
if(numMsgs == 0)
- return 0;
-
- if(numMsgs == 1) {
- msg = msgList[0];
- if(msg->m_type == CONNECT) {
- trans->connectOnly = 1;
- return 1;
- }
- if(msg->m_type == DISCONNECT) {
- trans->disconnectOnly = 1;
- return 1;
- }
- }
+ return NULL;
// log request messages to the activity log
int i;
for(i = 0; i < numMsgs; i++) {
msg = msgList[i];
- if(msg->m_type == REQUEST) {
-
- jsonObject* params = msg->_params;
- growing_buffer* act = buffer_init(128);
- buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
-
- char* str;
- int i = 0;
- while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
- if( i == 1 )
- OSRF_BUFFER_ADD(act, " ");
- else
- OSRF_BUFFER_ADD(act, ", ");
- OSRF_BUFFER_ADD(act, str);
+ osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
+
+ switch(msg->m_type) {
+
+ case REQUEST: {
+ const jsonObject* params = msg->_params;
+ growing_buffer* act = buffer_init(128);
+ char* method = msg->method_name;
+ buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
+ trans->service, method);
+
+ const jsonObject* obj = NULL;
+ int i = 0;
+ const char* str;
+ int redactParams = 0;
+ while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+ //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
+ if(!strncmp(method, str, strlen(str))) {
+ redactParams = 1;
+ break;
+ }
+ }
+ if(redactParams) {
+ OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+ } else {
+ i = 0;
+ while((obj = jsonObjectGetIndex(params, i++))) {
+ str = jsonObjectToJSON(obj);
+ if( i == 1 )
+ OSRF_BUFFER_ADD(act, " ");
+ else
+ OSRF_BUFFER_ADD(act, ", ");
+ OSRF_BUFFER_ADD(act, str);
+ free((void *)str);
+ }
+ }
+ osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+ buffer_free(act);
+ break;
}
- osrfLogActivity(OSRF_LOG_MARK, act->buf);
- buffer_free(act);
+
+ case CONNECT:
+ trans->connecting = 1;
+ if (numMsgs == 1)
+ trans->connectOnly = 1;
+ break;
+
+ case DISCONNECT:
+ trans->disconnecting = 1;
+ if (numMsgs == 1)
+ trans->disconnectOnly = 1;
+ break;
+
+ case RESULT:
+ osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
+ break;
+
+ case STATUS:
+ osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
+ break;
+
+ default:
+ osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
+ msg->m_type );
+ break;
}
}
- return 1;
+ char* jsonString = osrfMessageSerializeBatch(msgList, numMsgs);
+ for(i = 0; i < numMsgs; i++) {
+ osrfMessageFree(msgList[i]);
+ }
+ return jsonString;
}
static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
if(trans->multipart) {
sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
contentTypeBuf[79] = '\0';
- osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
- ap_set_content_type(trans->apreq, contentTypeBuf);
+ osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
+ trans->delim, contentTypeBuf);
+ ap_set_content_type(trans->apreq, contentTypeBuf);
ap_rprintf(trans->apreq, "--%s\n", trans->delim);
} else {
- ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
+ ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
}
}
-static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
+/**
+ * Cache the transaction with the JID of the backend process we are talking to
+ */
+static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
jsonObject* cacheObj = jsonNewObject(NULL);
jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
- jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
+ jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
- osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
+ osrfCachePutObject(trans->thread, cacheObj, CACHE_TIME);
}
-
+
/**
* Writes a single chunk of multipart/x-mixed-replace content
*/
static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
+ osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
ap_rprintf(trans->apreq,
"Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
- osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
+ //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
+ //JSON_CONTENT_TYPE, msg->body);
if(trans->complete) {
ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
- osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
+ //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
} else {
ap_rprintf(trans->apreq, "--%s\n", trans->delim);
- osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
+ //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
}
ap_rflush(trans->apreq);
}
if(!osrfHttpTranslatorSetTo(trans))
return HTTP_BAD_REQUEST;
- if(!osrfHttpTranslatorParseRequest(trans))
+ char* jsonBody = osrfHttpTranslatorParseRequest(trans);
+ if (NULL == jsonBody)
return HTTP_BAD_REQUEST;
while(client_recv(trans->handle, 0))
// send the message to the recipient
transport_message* tmsg = message_init(
- trans->body, NULL, trans->thread, trans->recipient, NULL);
+ jsonBody, NULL, trans->thread, trans->recipient, NULL);
message_set_osrf_xid(tmsg, osrfLogGetXid());
client_send_message(trans->handle, tmsg);
message_free(tmsg);
+ free(jsonBody);
if(trans->disconnectOnly) {
osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
+ osrfCacheRemove(trans->thread);
return OK;
}
if(trans->handle->error) {
osrfLogError(OSRF_LOG_MARK, "Transport error");
+ osrfCacheRemove(trans->thread);
return HTTP_INTERNAL_SERVER_ERROR;
}
if(msg->is_error) {
osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
+ osrfCacheRemove(trans->thread);
return HTTP_NOT_FOUND;
}
if(firstWrite) {
osrfHttpTranslatorInitHeaders(trans, msg);
- osrfHttpTranslatorCacheSession(trans);
+ if(trans->connecting)
+ osrfHttpTranslatorCacheSession(trans, msg->sender);
firstWrite = 0;
}
if(trans->complete || trans->connectOnly) {
growing_buffer* buf = buffer_init(128);
- int i;
+ unsigned int i;
OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
for(i = 1; i < trans->messages->size; i++) {
buffer_chomp(buf); // chomp off the closing array bracket
char* body = osrfListGetIndex(trans->messages, i);
char newbuf[strlen(body)];
- sprintf(newbuf, body+1); // chomp off the opening array bracket
+ sprintf(newbuf, "%s", body+1); // chomp off the opening array bracket
OSRF_BUFFER_ADD_CHAR(buf, ',');
OSRF_BUFFER_ADD(buf, newbuf);
}
-
+
ap_rputs(buf->buf, trans->apreq);
buffer_free(buf);
}
}
}
+ if(trans->disconnecting) // DISCONNECT within a multi-message batch
+ osrfCacheRemove(trans->thread);
+
return OK;
}
}
}
+#if 0
+// Commented out to avoid compiler warning
// it's dead, Jim
static apr_status_t childExit(void* data) {
osrf_system_shutdown();
return OK;
}
+#endif
static void childInit(apr_pool_t *p, server_rec *s) {
if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
osrfCacheInit(servers, 1, 86400);
osrfConnected = 1;
+ allowedOrigins = osrfNewStringArray(4);
+ osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
+
// at pool destroy time (= child exit time), cleanup
// XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
//apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
r->allowed |= (AP_METHOD_BIT << M_POST);
osrfLogSetAppname("osrf_http_translator");
+ osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
testConnection(r);
+ crossOriginHeaders(r, allowedOrigins);
- osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
osrfLogMkXid();
+ osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
if(trans->body) {
stat = osrfHttpTranslatorProcess(trans);
//osrfHttpTranslatorDebug(trans);
NULL,
NULL,
NULL,
- NULL,
+ osrfHttpTranslatorCmds,
registerHooks,
};
-
-
-
-