LP#1684970 Translator compatible with mod_remoteip
[opensrf-equinox.git] / src / gateway / osrf_http_translator.c
index 01c8fcc..ec8a685 100644 (file)
@@ -8,6 +8,7 @@
 #include <opensrf/osrfConfig.h>
 #include <opensrf/osrf_json.h>
 #include <opensrf/osrf_cache.h>
+#include <opensrf/string_array.h>
 
 #define MODULE_NAME "osrf_http_translator_module"
 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
@@ -23,6 +24,7 @@
 #define JSON_CONTENT_TYPE "text/plain"
 #define MAX_MSGS_PER_PACKET 256
 #define CACHE_TIME 300
+#define TRANSLATOR_INGRESS "translator-v1"
 
 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
@@ -32,6 +34,7 @@
 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
 
+
 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
@@ -41,13 +44,17 @@ char* domainName = NULL;
 int osrfConnected = 0;
 char recipientBuf[128];
 char contentTypeBuf[80];
+osrfStringArray* allowedOrigins = NULL;
 
+#if 0
+// Commented out to avoid compiler warning
 // for development only, writes to apache error log
 static void _dbg(char* s, ...) {
     VA_LIST_TO_STRING(s);
     fprintf(stderr, "%s\n", VA_BUF);
     fflush(stderr);
 }
+#endif
 
 // Translator struct
 typedef struct {
@@ -63,8 +70,10 @@ typedef struct {
     int complete;
     int timeout;
     int multipart;
-    int connectOnly;
-    int disconnectOnly;
+    int connectOnly; // there is only 1 message, a CONNECT
+    int disconnectOnly; // there is only 1 message, a DISCONNECT
+    int connecting; // there is a connect message in this batch
+    int disconnecting; // there is a connect message in this batch
     int localXid;
 } osrfHttpTranslator;
 
@@ -82,8 +91,8 @@ static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *conf
        return NULL;
 }
 
-/** set up the configuratoin handlers */
-static const command_rec osrf_json_gateway_cmds[] = {
+/** set up the configuration handlers */
+static const command_rec osrfHttpTranslatorCmds[] = {
        AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
                        NULL, RSRC_CONF, "osrf translator config file"),
        AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
@@ -107,22 +116,28 @@ static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
     trans->complete = 0;
     trans->connectOnly = 0;
     trans->disconnectOnly = 0;
+    trans->connecting = 0;
+    trans->disconnecting = 0;
+#ifdef APACHE_MIN_24
+    trans->remoteHost = apreq->useragent_ip;
+#else
     trans->remoteHost = apreq->connection->remote_ip;
+#endif
     trans->messages = NULL;
 
     /* load the message body */
-       osrfStringArray* params = apacheParseParms(apreq);
+    osrfStringArray* params    = apacheParseParms(apreq);
     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
     osrfStringArrayFree(params);
 
     /* load the request headers */
-    if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
-           osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
+    if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
+        // force our log xid to match the caller
+        osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
 
     trans->handle = osrfSystemGetTransportClient();
     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
-    trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
 
     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
     if(timeout) 
@@ -140,6 +155,11 @@ static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
     trans->delim = md5sum(buf);
 
+    /* Use thread if it has been passed in; otherwise, just use the delimiter */
+    trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
+        ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
+        : (const char*)trans->delim;
+
     return trans;
 }
 
@@ -152,6 +172,8 @@ static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
     osrfListFree(trans->messages);
 }
 
+#if 0
+// Commented out to avoid compiler warning
 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
     _dbg("-----------------------------------");
     _dbg("body = %s", trans->body);
@@ -160,6 +182,7 @@ static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
     _dbg("multipart = %d", trans->multipart);
     _dbg("recipient = %s", trans->recipient);
 }
+#endif
 
 /**
  * Determines the correct recipient address based on the requested 
@@ -176,7 +199,8 @@ static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
         } else {
             // service is specified, build a recipient address 
             // from the router, domain, and service
-            int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
+            int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
+                domainName, trans->service);
             recipientBuf[size] = '\0';
             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
             trans->recipient = recipientBuf;
@@ -189,16 +213,21 @@ static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
             sessionCache = osrfCacheGetObject(trans->thread);
 
             if(sessionCache) {
-                char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
-                char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
+                const char* ipAddr = jsonObjectGetString(
+                    jsonObjectGetKeyConst( sessionCache, "ip" ));
+                const char* recipient = jsonObjectGetString(
+                    jsonObjectGetKeyConst( sessionCache, "jid" ));
 
                 // choosing a specific recipient address requires that the recipient and 
                 // thread be cached on the server (so drone processes cannot be hijacked)
                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
-                    osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
+                    osrfLogDebug( OSRF_LOG_MARK,
+                        "Found cached session from host %s and recipient %s",
                         trans->remoteHost, trans->recipient);
                     stat = 1;
-                    trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
+                    trans->service = apr_pstrdup(
+                        trans->apreq->pool, jsonObjectGetString(
+                            jsonObjectGetKeyConst( sessionCache, "service" )));
 
                 } else {
                     osrfLogError(OSRF_LOG_MARK, 
@@ -218,54 +247,96 @@ static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
 }
 
 /**
- * Parses the request body and logs any REQUEST messages to the activity log
+ * Parses the request body, logs any REQUEST messages to the activity log, 
+ * stamps the translator ingress on each message, and returns the updated 
+ * messages as a JSON string.
  */
-static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
+static char* osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
     osrfMessage* msg;
     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
 
     if(numMsgs == 0)
-        return 0;
-
-    if(numMsgs == 1) {
-        msg = msgList[0];
-        if(msg->m_type == CONNECT) {
-            trans->connectOnly = 1;
-            return 1;
-        }
-        if(msg->m_type == DISCONNECT) {
-            trans->disconnectOnly = 1;
-            return 1;
-        }
-    }
+        return NULL;
 
     // log request messages to the activity log
     int i;
     for(i = 0; i < numMsgs; i++) {
         msg = msgList[i];
-        if(msg->m_type == REQUEST) {
-
-            jsonObject* params = msg->_params;
-            growing_buffer* act = buffer_init(128);    
-            buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
-
-            char* str; 
-            int i = 0;
-            while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
-                if( i == 1 )
-                    OSRF_BUFFER_ADD(act, " ");
-                else 
-                    OSRF_BUFFER_ADD(act, ", ");
-                OSRF_BUFFER_ADD(act, str);
+        osrfMessageSetIngress(msg, TRANSLATOR_INGRESS);
+
+        switch(msg->m_type) {
+
+            case REQUEST: {
+                const jsonObject* params = msg->_params;
+                growing_buffer* act = buffer_init(128);        
+                char* method = msg->method_name;
+                buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
+                    trans->service, method);
+
+                const jsonObject* obj = NULL;
+                int i = 0;
+                const char* str;
+                int redactParams = 0;
+                while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+                    //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
+                    if(!strncmp(method, str, strlen(str))) {
+                        redactParams = 1;
+                        break;
+                    }
+                }
+                if(redactParams) {
+                    OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+                } else {
+                    i = 0;
+                    while((obj = jsonObjectGetIndex(params, i++))) {
+                        str = jsonObjectToJSON(obj);
+                        if( i == 1 )
+                            OSRF_BUFFER_ADD(act, " ");
+                        else
+                            OSRF_BUFFER_ADD(act, ", ");
+                        OSRF_BUFFER_ADD(act, str);
+                        free((void *)str);
+                    }
+                }
+                osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+                buffer_free(act);
+                break;
             }
-            osrfLogActivity(OSRF_LOG_MARK, act->buf);
-            buffer_free(act);
+
+            case CONNECT:
+                trans->connecting = 1;
+                if (numMsgs == 1) 
+                    trans->connectOnly = 1;
+                break;
+
+            case DISCONNECT:
+                trans->disconnecting = 1;
+                if (numMsgs == 1) 
+                    trans->disconnectOnly = 1;
+                break;
+
+            case RESULT:
+                osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
+                break;
+
+            case STATUS:
+                osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
+                break;
+
+            default:
+                osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
+                    msg->m_type );
+                break;
         }
     }
 
-    return 1;
+    char* jsonString = osrfMessageSerializeBatch(msgList, numMsgs);
+    for(i = 0; i < numMsgs; i++) {
+        osrfMessageFree(msgList[i]);
+    }
+    return jsonString;
 }
 
 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
@@ -295,36 +366,42 @@ static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_m
     if(trans->multipart) {
         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
         contentTypeBuf[79] = '\0';
-        osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
-           ap_set_content_type(trans->apreq, contentTypeBuf);
+        osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
+        trans->delim, contentTypeBuf);
+        ap_set_content_type(trans->apreq, contentTypeBuf);
         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
     } else {
-           ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
+        ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
     }
 }
 
-static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
+/**
+ * Cache the transaction with the JID of the backend process we are talking to
+ */
+static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
     jsonObject* cacheObj = jsonNewObject(NULL);
     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
-    jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
+    jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
-    osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
+    osrfCachePutObject(trans->thread, cacheObj, CACHE_TIME);
 }
 
-           
+
 /**
  * Writes a single chunk of multipart/x-mixed-replace content
  */
 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
+    osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
     ap_rprintf(trans->apreq, 
         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
-    osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
+    //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
+    //JSON_CONTENT_TYPE, msg->body);
     if(trans->complete) {
         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
-        osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
+        //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
     } else {
         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
-        osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
+        //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
     }
     ap_rflush(trans->apreq);
 }
@@ -336,7 +413,8 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
     if(!osrfHttpTranslatorSetTo(trans))
         return HTTP_BAD_REQUEST;
 
-    if(!osrfHttpTranslatorParseRequest(trans))
+    char* jsonBody = osrfHttpTranslatorParseRequest(trans);
+    if (NULL == jsonBody)
         return HTTP_BAD_REQUEST;
 
     while(client_recv(trans->handle, 0))
@@ -344,13 +422,15 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
 
     // send the message to the recipient
     transport_message* tmsg = message_init(
-        trans->body, NULL, trans->thread, trans->recipient, NULL);
+        jsonBody, NULL, trans->thread, trans->recipient, NULL);
     message_set_osrf_xid(tmsg, osrfLogGetXid());
     client_send_message(trans->handle, tmsg);
     message_free(tmsg); 
+    free(jsonBody);
 
     if(trans->disconnectOnly) {
         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
+        osrfCacheRemove(trans->thread);
         return OK;
     }
 
@@ -361,6 +441,7 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
 
         if(trans->handle->error) {
             osrfLogError(OSRF_LOG_MARK, "Transport error");
+            osrfCacheRemove(trans->thread);
             return HTTP_INTERNAL_SERVER_ERROR;
         }
 
@@ -369,6 +450,7 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
 
         if(msg->is_error) {
             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
+            osrfCacheRemove(trans->thread);
             return HTTP_NOT_FOUND;
         }
 
@@ -377,7 +459,8 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
 
         if(firstWrite) {
             osrfHttpTranslatorInitHeaders(trans, msg);
-            osrfHttpTranslatorCacheSession(trans);
+            if(trans->connecting)
+                osrfHttpTranslatorCacheSession(trans, msg->sender);
             firstWrite = 0;
         }
 
@@ -392,23 +475,26 @@ static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
 
             if(trans->complete || trans->connectOnly) {
                 growing_buffer* buf = buffer_init(128);
-                int i;
+                unsigned int i;
                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
                 for(i = 1; i < trans->messages->size; i++) {
                     buffer_chomp(buf); // chomp off the closing array bracket
                     char* body = osrfListGetIndex(trans->messages, i);
                     char newbuf[strlen(body)];
-                    sprintf(newbuf, body+1); // chomp off the opening array bracket
+                    sprintf(newbuf, "%s", body+1); // chomp off the opening array bracket
                     OSRF_BUFFER_ADD_CHAR(buf, ',');
                     OSRF_BUFFER_ADD(buf, newbuf);
                 }
-                
+
                 ap_rputs(buf->buf, trans->apreq);
                 buffer_free(buf);
             }
         }
     }
 
+    if(trans->disconnecting) // DISCONNECT within a multi-message batch
+        osrfCacheRemove(trans->thread);
+
     return OK;
 }
 
@@ -421,11 +507,14 @@ static void testConnection(request_rec* r) {
        }
 }
 
+#if 0
+// Commented out to avoid compiler warning
 // it's dead, Jim
 static apr_status_t childExit(void* data) {
     osrf_system_shutdown();
     return OK;
 }
+#endif
 
 static void childInit(apr_pool_t *p, server_rec *s) {
        if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
@@ -440,6 +529,9 @@ static void childInit(apr_pool_t *p, server_rec *s) {
     osrfCacheInit(servers, 1, 86400);
        osrfConnected = 1;
 
+    allowedOrigins = osrfNewStringArray(4);
+    osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
+
     // at pool destroy time (= child exit time), cleanup
     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
@@ -454,10 +546,12 @@ static int handler(request_rec *r) {
        r->allowed |= (AP_METHOD_BIT << M_POST);
 
        osrfLogSetAppname("osrf_http_translator");
+       osrfAppSessionSetIngress(TRANSLATOR_INGRESS);
     testConnection(r);
+    crossOriginHeaders(r, allowedOrigins);
 
-    osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
        osrfLogMkXid();
+    osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
     if(trans->body) {
         stat = osrfHttpTranslatorProcess(trans);
         //osrfHttpTranslatorDebug(trans);
@@ -482,10 +576,6 @@ module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
        NULL,
     NULL,
        NULL,
-    NULL,
+    osrfHttpTranslatorCmds,
        registerHooks,
 };
-
-
-
-