Implement the chunking of OSRF messages. I.e. bundle multiple
authorscottmk <scottmk@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 4 Aug 2010 03:20:33 +0000 (03:20 +0000)
committerscottmk <scottmk@9efc2488-bf62-4759-914b-345cdb29e865>
Wed, 4 Aug 2010 03:20:33 +0000 (03:20 +0000)
OSRF messages into an XMPP message, up to about 10k bytes, so
as to reduce networking overhead.

M    include/opensrf/osrf_application.h
M    src/libopensrf/osrf_application.c

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1988 9efc2488-bf62-4759-914b-345cdb29e865

include/opensrf/osrf_application.h
src/libopensrf/osrf_application.c

index 0bbb69f..433c42d 100644 (file)
@@ -4,6 +4,31 @@
 /**
        @file osrf_application.h
        @brief Routines to load and manage shared object libraries.
+
+       Every method of a service is implemented by a C function.  In a few cases those
+       functions are generic to all services.  In other cases they are loaded and executed from
+       a shared object library that is specific to the application offering the service,  A
+       registry maps method names to function names so that we can call the right function.
+
+       Each such function has a similar signature:
+
+               int method_name( osrfMethodContext* ctx );
+
+       The return value is negative in case of an error.  A return code of zero implies that
+       the method has already sent the client a STATUS message to say that it is finished.
+       A return code greater than zero implies that the method has not sent such a STATUS
+       message, so we need to do so after the method returns.
+
+       Any arguments passed to the method are bundled together in a jsonObject inside the
+       osrfMethodContext.
+
+       An application's shared object may also implement any or all of three standard functions:
+
+       - int osrfAppInitialize( void ) Called when an application is registered
+       - int osrfAppChildInit( void ) Called when a server drone is spawned
+       - void osrfAppChildExit( void ) Called when a server drone terminates
+
+       osrfAppInitialize() and osrfAppChild return zero if successful, and non-zero if not.
 */
 
 #include <opensrf/utils.h>
@@ -68,18 +93,65 @@ extern "C" {
 #define OSRF_METHOD_VERIFY_CONTEXT(d) _OSRF_METHOD_VERIFY_CONTEXT(d);
 #endif
 
+/**
+       @name Method options
+       @brief Macros that get OR'd together to form method options.
+*/
+/*@{*/
+/**
+       @brief  Marks a method as a system method.
+
+       System methods are implemented by generic functions, called via static linkage.  They
+       are not loaded or executed from shared objects.
+*/
 #define OSRF_METHOD_SYSTEM          1
+/**
+       @brief Notes that the method may return more than one result.
+
+       For a @em streaming method, we register both an atomic method and a non-atomic method.
+       See also OSRF_METHOD_ATOMIC.
+*/
 #define OSRF_METHOD_STREAMING       2
+/**
+       @brief  Combines all responses into a single RESULT message.
+
+       For a @em non-atomic method, the server returns each response to the client in a
+       separate RESULT message.  It sends a STATUS message at the end to signify the end of the
+       message stream.
+
+       For an @em atomic method, the server buffers all responses until the method returns,
+       and then sends them all at once in a single RESULT message (followed by a STATUS message).
+       Each individual response is encoded as an entry in a JSON array.  This buffering is
+       transparent to the function that implements the method.
+
+       Atomic methods incur less networking overhead than non-atomic methods, at the risk of
+       creating excessively large RESULT messages.  The HTTP gateway requires the atomic versions
+       of streaming methods because of the stateless nature of the HTTP protocol.
+
+       If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic
+       and a non-atomic method, whose names are identical except that the atomic one carries a
+       suffix of ".atomic".
+*/
 #define OSRF_METHOD_ATOMIC          4
+/**
+       @brief  Notes that a previous result to the same call may be available in memcache.
+
+       Before calling the registered function, a cachable method checks memcache for a previously
+       determined result for the same call.  If no such result is available, it calls the
+       registered function and caches the new result before returning.
+
+       This caching is not currently implemented for C methods.
+*/
 #define OSRF_METHOD_CACHABLE        8
+/*@}*/
 
 typedef struct {
-       char* name;                 /**< the method name. */
-       char* symbol;               /**< the symbol name (function name). */
-       char* notes;                /**< public method documentation. */
-       int argc;                   /**< how many args this method expects. */
+       char* name;                 /**< Method name. */
+       char* symbol;               /**< Symbol name (function name) within the shared object. */
+       char* notes;                /**< Public method documentation. */
+       int argc;                   /**< The minimum number of arguments for the method. */
        //char* paramNotes;         /**< Description of the params expected for this method. */
-       int options;                /**< bitswitches setting various options for this method. */
+       int options;                /**< Bit switches setting various options for this method. */
        void* userData;             /**< Opaque pointer to application-specific data. */
 
        /*
@@ -91,35 +163,15 @@ typedef struct {
 } osrfMethod;
 
 typedef struct {
-       osrfAppSession* session;    /**< the current session. */
-       osrfMethod* method;         /**< the requested method. */
-       jsonObject* params;         /**< the params to the method. */
-       int request;                /**< request id. */
-       jsonObject* responses;      /**< array of cached responses. */
+       osrfAppSession* session;    /**< Pointer to the current application session. */
+       osrfMethod* method;         /**< Pointer to the requested method. */
+       jsonObject* params;         /**< Parameters to the method. */
+       int request;                /**< Request id. */
+       jsonObject* responses;      /**< Array of cached responses. */
 } osrfMethodContext;
 
-/**
-       Register an application
-       @param appName The name of the application
-       @param soFile The library (.so) file that implements this application
-       @return 0 on success, -1 on error
-*/
 int osrfAppRegisterApplication( const char* appName, const char* soFile );
 
-/**
-       @brief Register a method for a given application.
-       
-       @param appName Name of the application that implements the method.
-       @param methodName The fully qualified name of the method.
-       @param symbolName The symbol name (function name) that implements the method.
-       @param notes Public documentation for this method.
-       @params argc The number of arguments this method expects.
-       @param options Bit switches setting various options.
-       @return 0 on success, -1 on error
-
-       Any method with  the OSRF_METHOD_STREAMING option set will have a ".atomic"
-       version of the method registered automatically.
-*/
 int osrfAppRegisterMethod( const char* appName, const char* methodName,
                const char* symbolName, const char* notes, int argc, int options );
 
@@ -128,41 +180,19 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
 
 osrfMethod* _osrfAppFindMethod( const char* appName, const char* methodName );
 
-/**
-       Runs the specified method for the specified application.
-       @param appName The name of the application who's method to run
-       @param methodName The name of the method to run
-       @param ses The app session attached to this request
-       @params reqId The request id for this request
-       @param params The method parameters
-*/
 int osrfAppRunMethod( const char* appName, const char* methodName,
                osrfAppSession* ses, int reqId, jsonObject* params );
 
-/**
-       @brief Respond to the client with a method exception.
-       @param ses The current session.
-       @param request The request id.
-       @param msg The debug message to send to the client.
-       @return 0 on successfully sending of the message, -1 otherwise.
-*/
 int osrfAppRequestRespondException( osrfAppSession* ses, int request, const char* msg, ... );
 
 int osrfAppRespond( osrfMethodContext* context, const jsonObject* data );
-int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data );
 
-/* OSRF_METHOD_ATOMIC and/or OSRF_METHOD_CACHABLE and/or 0 for no special options */
-//int osrfAppProcessMethodOptions( char* method );
+int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data );
 
-/** Tell the backend process to run its child init function */
 int osrfAppRunChildInit(const char* appname);
 
 void osrfAppRunExitCode( void );
 
-/**
-       Determine whether the context looks healthy.
-       Return 0 if it does, or -1 if it doesn't.
-*/
 int osrfMethodVerifyContext( osrfMethodContext* ctx );
 
 #ifdef __cplusplus
index 836f73b..141d39f 100644 (file)
@@ -48,6 +48,8 @@
 #define OSRF_SYSMETHOD_ECHO_ATOMIC              "opensrf.system.echo.atomic"
 /*@}*/
 
+#define OSRF_MSG_BUFFER_SIZE     10240
+
 /**
        @brief Represent an Application.
 */
@@ -238,13 +240,12 @@ void osrfAppRunExitCode( void ) {
 
        The @a options parameter is zero or more of the following macros, OR'd together:
 
-       - OSRF_METHOD_SYSTEM
-       - OSRF_METHOD_STREAMING
-       - OSRF_METHOD_ATOMIC
-       - OSRF_METHOD_CACHABLE
+       - OSRF_METHOD_SYSTEM        called by static linkage (shouldn't be used here)
+       - OSRF_METHOD_STREAMING     method may return more than one response
+       - OSRF_METHOD_ATOMIC        return all responses collected in a single RESULT message
+       - OSRF_METHOD_CACHABLE      cache results in memcache
 
-       If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of
-       the method.
+       If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
 */
 int osrfAppRegisterMethod( const char* appName, const char* methodName,
                const char* symbolName, const char* notes, int argc, int options ) {
@@ -325,7 +326,7 @@ static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symb
                methodName = "";  // should never happen
 
        if( options & OSRF_METHOD_ATOMIC ) {
-               // Append ".atomic" to the name, and make the method streaming
+               // Append ".atomic" to the name, and make the method atomic
                char mb[ strlen( methodName ) + 8 ];
                sprintf( mb, "%s.atomic", methodName );
                method->name        = strdup( mb );
@@ -486,7 +487,11 @@ int osrfAppRunMethod( const char* appName, const char* methodName,
                return osrfAppRequestRespondException(
                                ses, reqId, "An unknown server error occurred" );
 
-       return _osrfAppPostProcess( &context, retcode );
+       retcode = _osrfAppPostProcess( &context, retcode );
+
+       if( context.responses )
+               jsonObjectFree( context.responses );
+       return retcode;
 }
 
 /**
@@ -526,25 +531,64 @@ int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data )
 }
 
 /**
+       @brief Send any response messages that have accumulated in the output buffer.
+       @param ses Pointer to the current application session.
+       @param outbuf Pointer to the output buffer.
+       @return Zero if successful, or -1 if not.
+
+       Used only by servers to respond to clients.
+*/
+static int flush_responses( osrfAppSession* ses, growing_buffer* outbuf ) {
+
+       // Collect any inbound traffic on the socket(s).  This doesn't accomplish anything for the
+       // immediate task at hand, but it may help to keep TCP from getting clogged in some cases.
+       osrf_app_session_queue_wait( ses, 0, NULL );
+
+       int rc = 0;
+       if( buffer_length( outbuf ) > 0 ) {    // If there's anything to send...
+               buffer_add_char( outbuf, ']' );    // Close the JSON array
+               if( osrfSendTransportPayload( ses, OSRF_BUFFER_C_STR( ses->outbuf ))) {
+                       osrfLogError( OSRF_LOG_MARK, "Unable to flush response buffer" );
+                       rc = -1;
+               }
+       }
+       buffer_reset( ses->outbuf );
+       return rc;
+}
+
+/**
+       @brief Add a message to an output buffer.
+       @param outbuf Pointer to the output buffer.
+       @param msg Pointer to the message to be added, in the form of a JSON string.
+
+       Since the output buffer is in the form of a JSON array, prepend a left bracket to the
+       first message, and a comma to subsequent ones.
+
+       Used only by servers to respond to clients.
+*/
+static inline void append_msg( growing_buffer* outbuf, const char* msg ) {
+       if( outbuf && msg ) {
+               char prefix = buffer_length( outbuf ) > 0 ? ',' : '[';
+               buffer_add_char( outbuf, prefix );
+               buffer_add( outbuf, msg );
+       }
+}
+
+/**
        @brief Either send or enqueue a response to a client, optionally with a completion notice.
        @param ctx Pointer to the method context.
        @param data Pointer to the response, in the form of a jsonObject.
        @param complete Boolean: if true, we will accompany the RESULT message with a STATUS
        message indicating that the response is complete.
-       @return Zero if successful, or -1 upon error.  The only recognized errors are if either
-       the @a ctx pointer or its method pointer is NULL.
+       @return Zero if successful, or -1 upon error.
 
        For an atomic method, add a copy of the response data to a cache within the method
        context, to be sent later.  In this case the @a complete parameter has no effect,
        because we'll send the STATUS message later when we send the cached results.
 
-       If the method is cachable but not atomic, do nothing, ignoring the results in @a data.
-       Apparently there are no cachable methods at this writing.  If we ever invent some, we
-       may need to revisit this function.
-
-       If the method is neither atomic nor cachable, then send a RESULT message to the client,
-       with the results in @a data.  If @a complete is true, also send a STATUS message to
-       indicate that the response is complete.
+       If the method is not atomic, translate the message into JSON and append it to a buffer,
+       flushing the buffer as needed to avoid overflow.  If @a complete is true, append
+       a STATUS message (as JSON) to the buffer and flush the buffer.
 */
 static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int complete ) {
        if(!(ctx && ctx->method)) return -1;
@@ -560,15 +604,55 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int
                // Add a copy of the data object to the cache.
                if ( data != NULL )
                        jsonObjectPush( ctx->responses, jsonObjectClone(data) );
-       }
+       } else {
+               osrfLogDebug( OSRF_LOG_MARK,
+                       "Adding responses to stash for method %s", ctx->method->name );
+
+               if( data ) {
+                       // If you want to flush the intput buffers for every output message,
+                       // this is the place to do it.
+                       //osrf_app_session_queue_wait( ctx->session, 0, NULL );
+
+                       // Create an OSRF message
+                       osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+                       osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+                       osrf_message_set_result( msg, data );
+
+                       // Serialize the OSRF message into JSON text
+                       char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+                       osrfMessageFree( msg );
+
+                       // If the new message would overflow the buffer, flush the output buffer first
+                       int len_so_far = buffer_length( ctx->session->outbuf );
+                       if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
+                               if( flush_responses( ctx->session, ctx->session->outbuf ))
+                                       return -1;
+                       }
+
+                       // Append the JSON text to the output buffer
+                       append_msg( ctx->session->outbuf, json );
+                       free( json );
+               }
 
-       if( !(ctx->method->options & OSRF_METHOD_ATOMIC ) &&
-                       !(ctx->method->options & OSRF_METHOD_CACHABLE) ) {
+               if(complete) {
+                       // Create a STATUS message
+                       osrfMessage* status_msg = osrf_message_init( STATUS, ctx->request, 1 );
+                       osrf_message_set_status_info( status_msg, "osrfConnectStatus", "Request Complete",
+                               OSRF_STATUS_COMPLETE );
 
-               if(complete)
-                       osrfAppRequestRespondComplete( ctx->session, ctx->request, data );
-               else
-                       osrfAppRequestRespond( ctx->session, ctx->request, data );
+                       // Serialize the STATUS message into JSON text
+                       char* json = jsonObjectToJSON( osrfMessageToJSON( status_msg ));
+                       osrfMessageFree( status_msg );
+
+                       // Add the STATUS message to the output buffer.
+                       // It's short, so don't worry about avoiding overflow.
+                       append_msg( ctx->session->outbuf, json );
+                       free( json );
+
+                       // Flush the output buffer, sending any accumulated messages.
+                       if( flush_responses( ctx->session, ctx->session->outbuf ))
+                               return -1;
+               }
        }
 
        return 0;
@@ -598,8 +682,6 @@ static int _osrfAppPostProcess( osrfMethodContext* ctx, int retcode ) {
                // any responses yet).  Now send them all at once, followed by a STATUS message
                // to say that we're finished.
                osrfAppRequestRespondComplete( ctx->session, ctx->request, ctx->responses );
-               jsonObjectFree(ctx->responses);
-               ctx->responses = NULL;
 
        } else {
                // We have no cached responses to return.