LP#1709710: Make chunk sizing smart about XML quoting
[opensrf-equinox.git] / src / libopensrf / osrf_application.c
index 6eba7be..4a5f53e 100644 (file)
 #define OSRF_SYSMETHOD_ECHO_ATOMIC              "opensrf.system.echo.atomic"
 /*@}*/
 
-#define OSRF_MSG_BUFFER_SIZE     10240
+/**
+       @name Method options
+       @brief Macros that get OR'd together to form method options.
+
+       These options are in addition to the ones stipulated by the caller of
+       osrfRegisterMethod(), and are not externally visible.
+*/
+/*@{*/
+/**
+       @brief  Marks a method as a system method.
+
+       System methods are implemented by generic functions, called via static linkage.  They
+       are not loaded or executed from shared objects.
+*/
+#define OSRF_METHOD_SYSTEM          1
+/**
+       @brief  Combines all responses into a single RESULT message.
+
+       For a @em non-atomic method, the server returns each response to the client in a
+       separate RESULT message.  It sends a STATUS message at the end to signify the end of the
+       message stream.
+
+       For an @em atomic method, the server buffers all responses until the method returns,
+       and then sends them all at once in a single RESULT message (followed by a STATUS message).
+       Each individual response is encoded as an entry in a JSON array.  This buffering is
+       transparent to the function that implements the method.
+
+       Atomic methods incur less networking overhead than non-atomic methods, at the risk of
+       creating excessively large RESULT messages.  The HTTP gateway requires the atomic versions
+       of streaming methods because of the stateless nature of the HTTP protocol.
+
+       If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic
+       and a non-atomic method, whose names are identical except that the atomic one carries a
+       suffix of ".atomic".
+*/
+#define OSRF_METHOD_ATOMIC          4
+/*@}*/
 
 /**
        @brief Represent an Application.
@@ -59,20 +95,24 @@ typedef struct {
        void (*onExit) (void);      /**< Exit handler for the application. */
 } osrfApplication;
 
-static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symbolName,
-               const char* notes, int argc, int options, void* );
+static void register_method( osrfApplication* app, const char* methodName,
+       const char* symbolName, const char* notes, int argc, int options, void * user_data );
+static osrfMethod* build_method( const char* methodName, const char* symbolName,
+       const char* notes, int argc, int options, void* );
 static void osrfAppSetOnExit(osrfApplication* app, const char* appName);
-static void _osrfAppRegisterSysMethods( const char* app );
+static void register_system_methods( osrfApplication* app );
 static inline osrfApplication* _osrfAppFindApplication( const char* name );
 static inline osrfMethod* osrfAppFindMethod( osrfApplication* app, const char* methodName );
 static int _osrfAppRespond( osrfMethodContext* context, const jsonObject* data, int complete );
 static int _osrfAppPostProcess( osrfMethodContext* context, int retcode );
 static int _osrfAppRunSystemMethod(osrfMethodContext* context);
 static void _osrfAppSetIntrospectMethod( osrfMethodContext* ctx, const osrfMethod* method,
-               jsonObject* resp );
+       jsonObject* resp );
 static int osrfAppIntrospect( osrfMethodContext* ctx );
 static int osrfAppIntrospectAll( osrfMethodContext* ctx );
 static int osrfAppEcho( osrfMethodContext* ctx );
+static void osrfMethodFree( char* name, void* p );
+static void osrfAppFree( char* name, void* p );
 
 /**
        @brief Registry of applications.
@@ -97,27 +137,35 @@ int osrfAppRegisterApplication( const char* appName, const char* soFile ) {
 
        osrfLogSetAppname( appName );
 
-       if( !_osrfAppHash )
+       if( !_osrfAppHash ) {
                _osrfAppHash = osrfNewHash();
+               osrfHashSetCallback( _osrfAppHash, osrfAppFree );
+       }
 
        osrfLogInfo( OSRF_LOG_MARK, "Registering application %s with file %s", appName, soFile );
 
+       // Open the shared object.
        void* handle = dlopen( soFile, RTLD_NOW );
        if( ! handle ) {
                const char* msg = dlerror();
-               osrfLogWarning( OSRF_LOG_MARK, "Failed to dlopen library file %s: %s", soFile, msg );
+               osrfLogError( OSRF_LOG_MARK, "Failed to dlopen library file %s: %s", soFile, msg );
                return -1;
        }
 
+       // Construct the osrfApplication.
        osrfApplication* app = safe_malloc(sizeof(osrfApplication));
        app->handle = handle;
-       app->onExit = NULL;
        app->methods = osrfNewHash();
+       osrfHashSetCallback( app->methods, osrfMethodFree );
+       app->onExit = NULL;
+
+       // Add the newly-constructed app to the list.
        osrfHashSet( _osrfAppHash, app, appName );
 
-       /* see if we can run the initialize method */
+       // Try to run the initialize method.  Typically it will register one or more
+       // methods of the application.
        int (*init) (void);
-       *(void **) (&init) = dlsym(app->handle, "osrfAppInitialize");
+       *(void **) (&init) = dlsym( handle, "osrfAppInitialize" );
 
        if( (error = dlerror()) != NULL ) {
                osrfLogWarning( OSRF_LOG_MARK,
@@ -130,18 +178,15 @@ int osrfAppRegisterApplication( const char* appName, const char* soFile ) {
                int ret;
                if( (ret = (*init)()) ) {
                        osrfLogWarning( OSRF_LOG_MARK, "Application %s returned non-zero value from "
-                                       "'osrfAppInitialize', not registering...", appName );
-                       //free(app->name); /* need a method to remove an application from the list */
-                       //free(app);
+                               "'osrfAppInitialize', not registering...", appName );
+                       osrfHashRemove( _osrfAppHash, appName );
                        return ret;
                }
        }
 
-       _osrfAppRegisterSysMethods(appName);
-
+       register_system_methods( app );
        osrfLogInfo( OSRF_LOG_MARK, "Application %s registered successfully", appName );
-
-       osrfAppSetOnExit(app, appName);
+       osrfAppSetOnExit( app, appName );
 
        return 0;
 }
@@ -240,9 +285,7 @@ void osrfAppRunExitCode( void ) {
 
        The @a options parameter is zero or more of the following macros, OR'd together:
 
-       - OSRF_METHOD_SYSTEM        called by static linkage (shouldn't be used here)
        - OSRF_METHOD_STREAMING     method may return more than one response
-       - OSRF_METHOD_ATOMIC        return all responses collected in a single RESULT message
        - OSRF_METHOD_CACHABLE      cache results in memcache
 
        If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
@@ -262,7 +305,7 @@ int osrfAppRegisterMethod( const char* appName, const char* methodName,
 }
 
 /**
-       @brief Register a method for a specified application.
+       @brief Register an extended method for a specified application.
 
        @param appName Name of the application that implements the method.
        @param methodName The fully qualified name of the method.
@@ -271,16 +314,16 @@ int osrfAppRegisterMethod( const char* appName, const char* methodName,
        @param argc How many arguments this method expects.
        @param options Bit switches setting various options.
        @param user_data Opaque pointer to be passed to the dynamically called function.
-       @return Zero on success, or -1 on error.
+       @return Zero if successful, or -1 upon error.
 
        This function is identical to osrfAppRegisterMethod(), except that it also installs
        a method-specific opaque pointer.  When we call the corresponding function at
        run time, this pointer will be available to the function via the method context.
 */
 int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
-               const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
+       const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
 
-       if( !appName || ! methodName  ) return -1;
+       if( !appName || ! methodName ) return -1;
 
        osrfApplication* app = _osrfAppFindApplication(appName);
        if(!app) {
@@ -290,24 +333,45 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
 
        osrfLogDebug( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
 
-       osrfMethod* method = _osrfAppBuildMethod(
-               methodName, symbolName, notes, argc, options, user_data );
-       method->options = options;
+       // Extract the only valid option bits, and ignore the rest.
+       int opts = options & ( OSRF_METHOD_STREAMING | OSRF_METHOD_CACHABLE );
 
-       /* plug the method into the list of methods */
-       osrfHashSet( app->methods, method, method->name );
+       // Build and install a non-atomic method.
+       register_method(
+               app, methodName, symbolName, notes, argc, opts, user_data );
 
-       if( options & OSRF_METHOD_STREAMING ) { /* build the atomic counterpart */
-               int newops = options | OSRF_METHOD_ATOMIC;
-               osrfMethod* atomicMethod = _osrfAppBuildMethod(
-                       methodName, symbolName, notes, argc, newops, user_data );
-               osrfHashSet( app->methods, atomicMethod, atomicMethod->name );
+       if( opts & OSRF_METHOD_STREAMING ) {
+               // Build and install an atomic version of the same method.
+               register_method(
+                       app, methodName, symbolName, notes, argc, opts | OSRF_METHOD_ATOMIC, user_data );
        }
 
        return 0;
 }
 
 /**
+       @brief Register a single method for a specified application.
+
+       @param appName Pointer to the application that implements the method.
+       @param methodName The fully qualified name of the method.
+       @param symbolName The symbol name (function name) that implements the method.
+       @param notes Public documentation for this method.
+       @param argc How many arguments this method expects.
+       @param options Bit switches setting various options.
+       @param user_data Opaque pointer to be passed to the dynamically called function.
+*/
+static void register_method( osrfApplication* app, const char* methodName,
+       const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
+
+       if( !app || ! methodName ) return;
+
+       // Build a method and add it to the list of methods
+       osrfMethod* method = build_method(
+               methodName, symbolName, notes, argc, options, user_data );
+       osrfHashSet( app->methods, method, method->name );
+}
+
+/**
        @brief Allocate and populate an osrfMethod.
        @param methodName Name of the method.
        @param symbolName Name of the function that implements the method.
@@ -316,9 +380,11 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
        @param options Bit switches setting various options.
        @param user_data An opaque pointer to be passed in the method context.
        @return Pointer to the newly allocated osrfMethod.
+
+       If OSRF_METHOD_ATOMIC is set, append ".atomic" to the method name.
 */
-static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symbolName,
-               const char* notes, int argc, int options, void* user_data ) {
+static osrfMethod* build_method( const char* methodName, const char* symbolName,
+       const char* notes, int argc, int options, void* user_data ) {
 
        osrfMethod* method      = safe_malloc(sizeof(osrfMethod));
 
@@ -326,11 +392,10 @@ static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symb
                methodName = "";  // should never happen
 
        if( options & OSRF_METHOD_ATOMIC ) {
-               // Append ".atomic" to the name, and make the method atomic
+               // Append ".atomic" to the name.
                char mb[ strlen( methodName ) + 8 ];
                sprintf( mb, "%s.atomic", methodName );
                method->name        = strdup( mb );
-               options |= OSRF_METHOD_STREAMING;
        } else {
                method->name        = strdup(methodName);
        }
@@ -351,34 +416,93 @@ static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symb
        if(user_data)
                method->userData    = user_data;
 
+       method->max_bundle_size = OSRF_MSG_BUNDLE_SIZE;
+    method->max_chunk_size  = OSRF_MSG_CHUNK_SIZE;
        return method;
 }
 
 /**
+       @brief Set the effective output buffer size for a given method.
+       @param appName Name of the application.
+       @param methodName Name of the method.
+       @param max_bundle_size Desired size of the output buffer, in bytes.
+       @return Zero if successful, or -1 if the specified method cannot be found.
+
+       A smaller buffer size may result in a lower latency for the first response, since we don't
+       wait for as many messages to accumulate before flushing the output buffer.  On the other
+       hand a larger buffer size may result in higher throughput due to lower network overhead.
+
+       Since the buffer size is not an absolute limit, it may be set to zero, in which case each
+       output transport message will contain no more than one RESULT message.
+
+       This function has no effect on atomic methods, because all responses are sent in a single
+       message anyway.  Likewise it has no effect on a method that returns only a single response.
+*/
+int osrfMethodSetBundleSize( const char* appName, const char* methodName, size_t max_bundle_size ) {
+       osrfMethod* method = _osrfAppFindMethod( appName, methodName );
+       if( method ) {
+               osrfLogInfo( OSRF_LOG_MARK,
+                       "Setting outbuf buffer size to %lu for method %s of application %s",
+                       (unsigned long) max_bundle_size, methodName, appName );
+               method->max_bundle_size = max_bundle_size;
+               return 0;
+       } else {
+               osrfLogWarning( OSRF_LOG_MARK,
+                       "Unable to set outbuf buffer size to %lu for method %s of application %s",
+                       (unsigned long) max_bundle_size, methodName, appName );
+               return -1;
+       }
+}
+
+/**
        @brief Register all of the system methods for this application.
-       @param app Application name.
+       @param app Pointer to the application.
 
        A client can call these methods the same way it calls application-specific methods,
        but they are implemented by functions here in this module, not by functions in the
        shared object.
 */
-static void _osrfAppRegisterSysMethods( const char* app ) {
-
-       osrfAppRegisterMethod(
-                       app, OSRF_SYSMETHOD_INTROSPECT, NULL,
-                       "Return a list of methods whose names have the same initial "
-                       "substring as that of the provided method name PARAMS( methodNameSubstring )",
-                       1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
-
-       osrfAppRegisterMethod(
-                       app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
-                       "Returns a complete list of methods. PARAMS()", 0,
-                       OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
-
-       osrfAppRegisterMethod(
-                       app, OSRF_SYSMETHOD_ECHO, NULL,
-                       "Echos all data sent to the server back to the client. PARAMS([a, b, ...])", 0,
-                       OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
+static void register_system_methods( osrfApplication* app ) {
+
+       if( !app ) return;
+
+       register_method(
+               app, OSRF_SYSMETHOD_INTROSPECT, NULL,
+               "Return a list of methods whose names have the same initial "
+               "substring as that of the provided method name PARAMS( methodNameSubstring )",
+               1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+               NULL );
+
+       register_method(
+               app, OSRF_SYSMETHOD_INTROSPECT, NULL,
+               "Return a list of methods whose names have the same initial "
+               "substring as that of the provided method name PARAMS( methodNameSubstring )",
+               1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+               NULL );
+
+       register_method(
+               app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
+               "Returns a complete list of methods. PARAMS()",
+               0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+               NULL );
+
+       register_method(
+               app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
+               "Returns a complete list of methods. PARAMS()",
+               0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+               NULL );
+
+       register_method(
+               app, OSRF_SYSMETHOD_ECHO, NULL,
+               "Echos all data sent to the server back to the client. PARAMS([a, b, ...])",
+               0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+               NULL );
+
+       register_method(
+               app, OSRF_SYSMETHOD_ECHO, NULL,
+               "Echos all data sent to the server back to the client. PARAMS([a, b, ...])",
+               0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+               NULL );
 }
 
 /**
@@ -609,29 +733,49 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int
                        "Adding responses to stash for method %s", ctx->method->name );
 
                if( data ) {
-                       // If you want to flush the intput buffers for every output message,
-                       // this is the place to do it.
-                       //osrf_app_session_queue_wait( ctx->session, 0, NULL );
-
-                       // Create an OSRF message
-                       osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
-                       osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
-                       osrf_message_set_result( msg, data );
-
-                       // Serialize the OSRF message into JSON text
-                       char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
-                       osrfMessageFree( msg );
-
-                       // If the new message would overflow the buffer, flush the output buffer first
-                       int len_so_far = buffer_length( ctx->session->outbuf );
-                       if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
-                               if( flush_responses( ctx->session, ctx->session->outbuf ))
-                                       return -1;
-                       }
+            char* data_str = jsonObjectToJSON(data); // free me (below)
+            size_t raw_size = strlen(data_str);
+            size_t extra_size = osrfXmlEscapingLength(data_str);
+            size_t data_size = raw_size + extra_size;
+            size_t chunk_size = ctx->method->max_chunk_size;
 
-                       // Append the JSON text to the output buffer
-                       append_msg( ctx->session->outbuf, json );
-                       free( json );
+            if (data_size > chunk_size) // calculate an escape-scaled chunk size
+                chunk_size = ((double)raw_size / (double)data_size) * (double)chunk_size;
+
+            if (chunk_size > 0 && chunk_size < raw_size) {
+                // chunking -- response message exceeds max message size.
+                // break it up into chunks for partial delivery
+
+                               osrfSendChunkedResult(ctx->session, ctx->request,
+                                                                         data_str, raw_size, chunk_size);
+
+            } else {
+
+                // bundling -- message body (may be) too small for single
+                // delivery.  prepare message for bundling.
+
+                // Create an OSRF message
+                osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+                osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+                osrf_message_set_result( msg, data );
+
+                // Serialize the OSRF message into JSON text
+                char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+                osrfMessageFree( msg );
+
+                // If the new message would overflow the buffer, flush the output buffer first
+                int len_so_far = buffer_length( ctx->session->outbuf );
+                if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->max_bundle_size )) {
+                    if( flush_responses( ctx->session, ctx->session->outbuf ))
+                        return -1;
+                }
+
+                // Append the JSON text to the output buffer
+                append_msg( ctx->session->outbuf, json );
+                free( json );
+            }
+
+            free(data_str);
                }
 
                if(complete) {
@@ -907,9 +1051,65 @@ int osrfMethodVerifyContext( osrfMethodContext* ctx )
        // Log the call, with the method and parameters
        char* params_str = jsonObjectToJSON( ctx->params );
        if( params_str ) {
-               osrfLogInfo( OSRF_LOG_MARK, "CALL:\t%s %s - %s",
-                        ctx->session->remote_service, ctx->method->name, params_str );
+               // params_str will at minimum be "[]"
+               int i = 0;
+               const char* str;
+               char* method = ctx->method->name;
+               int redact_params = 0;
+               while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+                       //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
+                       if(!strncmp(method, str, strlen(str))) {
+                               redact_params = 1;
+                               break;
+                       }
+               }
+
+               char* params_logged;
+               if(redact_params) {
+                       params_logged = strdup("**PARAMS REDACTED**");
+               } else {
+                       params_str[strlen(params_str) - 1] = '\0'; // drop the trailing ']'
+                       params_logged = strdup(params_str + 1);
+               }
                free( params_str );
+               osrfLogInfo( OSRF_LOG_MARK, "CALL: %s %s %s",
+                       ctx->session->remote_service, ctx->method->name, params_logged);
+               free( params_logged );
        }
        return 0;
 }
+
+/**
+       @brief Free an osrfMethod.
+       @param name Name of the method (not used).
+       @param p Void pointer pointing to the osrfMethod.
+
+       This function is designed to be installed as a callback for an osrfHash (hence the
+       unused @a name parameter and the void pointer).
+*/
+static void osrfMethodFree( char* name, void* p ) {
+       osrfMethod* method = p;
+       if( method ) {
+               free( method->name );
+               free( method->symbol );
+               free( method->notes );
+               free( method );
+       }
+}
+
+/**
+       @brief Free an osrfApplication
+       @param name Name of the application (not used).
+       @param p Void pointer pointing to the osrfApplication.
+
+       This function is designed to be installed as a callback for an osrfHash (hence the
+       unused @a name parameter and the void pointer).
+*/
+static void osrfAppFree( char* name, void* p ) {
+       osrfApplication* app = p;
+       if( app ) {
+               dlclose( app->handle );
+               osrfHashFree( app->methods );
+               free( app );
+       }
+}