#define OSRF_SYSMETHOD_ECHO_ATOMIC "opensrf.system.echo.atomic"
/*@}*/
-#define OSRF_MSG_BUFFER_SIZE 10240
+/**
+ @name Method options
+ @brief Macros that get OR'd together to form method options.
+
+ These options are in addition to the ones stipulated by the caller of
+ osrfRegisterMethod(), and are not externally visible.
+*/
+/*@{*/
+/**
+ @brief Marks a method as a system method.
+
+ System methods are implemented by generic functions, called via static linkage. They
+ are not loaded or executed from shared objects.
+*/
+#define OSRF_METHOD_SYSTEM 1
+/**
+ @brief Combines all responses into a single RESULT message.
+
+ For a @em non-atomic method, the server returns each response to the client in a
+ separate RESULT message. It sends a STATUS message at the end to signify the end of the
+ message stream.
+
+ For an @em atomic method, the server buffers all responses until the method returns,
+ and then sends them all at once in a single RESULT message (followed by a STATUS message).
+ Each individual response is encoded as an entry in a JSON array. This buffering is
+ transparent to the function that implements the method.
+
+ Atomic methods incur less networking overhead than non-atomic methods, at the risk of
+ creating excessively large RESULT messages. The HTTP gateway requires the atomic versions
+ of streaming methods because of the stateless nature of the HTTP protocol.
+
+ If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic
+ and a non-atomic method, whose names are identical except that the atomic one carries a
+ suffix of ".atomic".
+*/
+#define OSRF_METHOD_ATOMIC 4
+/*@}*/
/**
@brief Represent an Application.
void (*onExit) (void); /**< Exit handler for the application. */
} osrfApplication;
-static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symbolName,
- const char* notes, int argc, int options, void* );
+static void register_method( osrfApplication* app, const char* methodName,
+ const char* symbolName, const char* notes, int argc, int options, void * user_data );
+static osrfMethod* build_method( const char* methodName, const char* symbolName,
+ const char* notes, int argc, int options, void* );
static void osrfAppSetOnExit(osrfApplication* app, const char* appName);
-static void _osrfAppRegisterSysMethods( const char* app );
+static void register_system_methods( osrfApplication* app );
static inline osrfApplication* _osrfAppFindApplication( const char* name );
static inline osrfMethod* osrfAppFindMethod( osrfApplication* app, const char* methodName );
static int _osrfAppRespond( osrfMethodContext* context, const jsonObject* data, int complete );
static int _osrfAppPostProcess( osrfMethodContext* context, int retcode );
static int _osrfAppRunSystemMethod(osrfMethodContext* context);
static void _osrfAppSetIntrospectMethod( osrfMethodContext* ctx, const osrfMethod* method,
- jsonObject* resp );
+ jsonObject* resp );
static int osrfAppIntrospect( osrfMethodContext* ctx );
static int osrfAppIntrospectAll( osrfMethodContext* ctx );
static int osrfAppEcho( osrfMethodContext* ctx );
+static void osrfMethodFree( char* name, void* p );
+static void osrfAppFree( char* name, void* p );
/**
@brief Registry of applications.
osrfLogSetAppname( appName );
- if( !_osrfAppHash )
+ if( !_osrfAppHash ) {
_osrfAppHash = osrfNewHash();
+ osrfHashSetCallback( _osrfAppHash, osrfAppFree );
+ }
osrfLogInfo( OSRF_LOG_MARK, "Registering application %s with file %s", appName, soFile );
+ // Open the shared object.
void* handle = dlopen( soFile, RTLD_NOW );
if( ! handle ) {
const char* msg = dlerror();
- osrfLogWarning( OSRF_LOG_MARK, "Failed to dlopen library file %s: %s", soFile, msg );
+ osrfLogError( OSRF_LOG_MARK, "Failed to dlopen library file %s: %s", soFile, msg );
return -1;
}
+ // Construct the osrfApplication.
osrfApplication* app = safe_malloc(sizeof(osrfApplication));
app->handle = handle;
- app->onExit = NULL;
app->methods = osrfNewHash();
+ osrfHashSetCallback( app->methods, osrfMethodFree );
+ app->onExit = NULL;
+
+ // Add the newly-constructed app to the list.
osrfHashSet( _osrfAppHash, app, appName );
- /* see if we can run the initialize method */
+ // Try to run the initialize method. Typically it will register one or more
+ // methods of the application.
int (*init) (void);
- *(void **) (&init) = dlsym(app->handle, "osrfAppInitialize");
+ *(void **) (&init) = dlsym( handle, "osrfAppInitialize" );
if( (error = dlerror()) != NULL ) {
osrfLogWarning( OSRF_LOG_MARK,
int ret;
if( (ret = (*init)()) ) {
osrfLogWarning( OSRF_LOG_MARK, "Application %s returned non-zero value from "
- "'osrfAppInitialize', not registering...", appName );
- //free(app->name); /* need a method to remove an application from the list */
- //free(app);
+ "'osrfAppInitialize', not registering...", appName );
+ osrfHashRemove( _osrfAppHash, appName );
return ret;
}
}
- _osrfAppRegisterSysMethods(appName);
-
+ register_system_methods( app );
osrfLogInfo( OSRF_LOG_MARK, "Application %s registered successfully", appName );
-
- osrfAppSetOnExit(app, appName);
+ osrfAppSetOnExit( app, appName );
return 0;
}
The @a options parameter is zero or more of the following macros, OR'd together:
- - OSRF_METHOD_SYSTEM called by static linkage (shouldn't be used here)
- OSRF_METHOD_STREAMING method may return more than one response
- - OSRF_METHOD_ATOMIC return all responses collected in a single RESULT message
- OSRF_METHOD_CACHABLE cache results in memcache
If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
}
/**
- @brief Register a method for a specified application.
+ @brief Register an extended method for a specified application.
@param appName Name of the application that implements the method.
@param methodName The fully qualified name of the method.
@param argc How many arguments this method expects.
@param options Bit switches setting various options.
@param user_data Opaque pointer to be passed to the dynamically called function.
- @return Zero on success, or -1 on error.
+ @return Zero if successful, or -1 upon error.
This function is identical to osrfAppRegisterMethod(), except that it also installs
a method-specific opaque pointer. When we call the corresponding function at
run time, this pointer will be available to the function via the method context.
*/
int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
- const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
+ const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
- if( !appName || ! methodName ) return -1;
+ if( !appName || ! methodName ) return -1;
osrfApplication* app = _osrfAppFindApplication(appName);
if(!app) {
osrfLogDebug( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
- osrfMethod* method = _osrfAppBuildMethod(
- methodName, symbolName, notes, argc, options, user_data );
- method->options = options;
+ // Extract the only valid option bits, and ignore the rest.
+ int opts = options & ( OSRF_METHOD_STREAMING | OSRF_METHOD_CACHABLE );
- /* plug the method into the list of methods */
- osrfHashSet( app->methods, method, method->name );
+ // Build and install a non-atomic method.
+ register_method(
+ app, methodName, symbolName, notes, argc, opts, user_data );
- if( options & OSRF_METHOD_STREAMING ) { /* build the atomic counterpart */
- int newops = options | OSRF_METHOD_ATOMIC;
- osrfMethod* atomicMethod = _osrfAppBuildMethod(
- methodName, symbolName, notes, argc, newops, user_data );
- osrfHashSet( app->methods, atomicMethod, atomicMethod->name );
+ if( opts & OSRF_METHOD_STREAMING ) {
+ // Build and install an atomic version of the same method.
+ register_method(
+ app, methodName, symbolName, notes, argc, opts | OSRF_METHOD_ATOMIC, user_data );
}
return 0;
}
/**
+ @brief Register a single method for a specified application.
+
+ @param appName Pointer to the application that implements the method.
+ @param methodName The fully qualified name of the method.
+ @param symbolName The symbol name (function name) that implements the method.
+ @param notes Public documentation for this method.
+ @param argc How many arguments this method expects.
+ @param options Bit switches setting various options.
+ @param user_data Opaque pointer to be passed to the dynamically called function.
+*/
+static void register_method( osrfApplication* app, const char* methodName,
+ const char* symbolName, const char* notes, int argc, int options, void * user_data ) {
+
+ if( !app || ! methodName ) return;
+
+ // Build a method and add it to the list of methods
+ osrfMethod* method = build_method(
+ methodName, symbolName, notes, argc, options, user_data );
+ osrfHashSet( app->methods, method, method->name );
+}
+
+/**
@brief Allocate and populate an osrfMethod.
@param methodName Name of the method.
@param symbolName Name of the function that implements the method.
@param options Bit switches setting various options.
@param user_data An opaque pointer to be passed in the method context.
@return Pointer to the newly allocated osrfMethod.
+
+ If OSRF_METHOD_ATOMIC is set, append ".atomic" to the method name.
*/
-static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symbolName,
- const char* notes, int argc, int options, void* user_data ) {
+static osrfMethod* build_method( const char* methodName, const char* symbolName,
+ const char* notes, int argc, int options, void* user_data ) {
osrfMethod* method = safe_malloc(sizeof(osrfMethod));
methodName = ""; // should never happen
if( options & OSRF_METHOD_ATOMIC ) {
- // Append ".atomic" to the name, and make the method atomic
+ // Append ".atomic" to the name.
char mb[ strlen( methodName ) + 8 ];
sprintf( mb, "%s.atomic", methodName );
method->name = strdup( mb );
- options |= OSRF_METHOD_STREAMING;
} else {
method->name = strdup(methodName);
}
if(user_data)
method->userData = user_data;
+ method->max_bundle_size = OSRF_MSG_BUNDLE_SIZE;
+ method->max_chunk_size = OSRF_MSG_CHUNK_SIZE;
return method;
}
/**
+ @brief Set the effective output buffer size for a given method.
+ @param appName Name of the application.
+ @param methodName Name of the method.
+ @param max_bundle_size Desired size of the output buffer, in bytes.
+ @return Zero if successful, or -1 if the specified method cannot be found.
+
+ A smaller buffer size may result in a lower latency for the first response, since we don't
+ wait for as many messages to accumulate before flushing the output buffer. On the other
+ hand a larger buffer size may result in higher throughput due to lower network overhead.
+
+ Since the buffer size is not an absolute limit, it may be set to zero, in which case each
+ output transport message will contain no more than one RESULT message.
+
+ This function has no effect on atomic methods, because all responses are sent in a single
+ message anyway. Likewise it has no effect on a method that returns only a single response.
+*/
+int osrfMethodSetBundleSize( const char* appName, const char* methodName, size_t max_bundle_size ) {
+ osrfMethod* method = _osrfAppFindMethod( appName, methodName );
+ if( method ) {
+ osrfLogInfo( OSRF_LOG_MARK,
+ "Setting outbuf buffer size to %lu for method %s of application %s",
+ (unsigned long) max_bundle_size, methodName, appName );
+ method->max_bundle_size = max_bundle_size;
+ return 0;
+ } else {
+ osrfLogWarning( OSRF_LOG_MARK,
+ "Unable to set outbuf buffer size to %lu for method %s of application %s",
+ (unsigned long) max_bundle_size, methodName, appName );
+ return -1;
+ }
+}
+
+/**
@brief Register all of the system methods for this application.
- @param app Application name.
+ @param app Pointer to the application.
A client can call these methods the same way it calls application-specific methods,
but they are implemented by functions here in this module, not by functions in the
shared object.
*/
-static void _osrfAppRegisterSysMethods( const char* app ) {
-
- osrfAppRegisterMethod(
- app, OSRF_SYSMETHOD_INTROSPECT, NULL,
- "Return a list of methods whose names have the same initial "
- "substring as that of the provided method name PARAMS( methodNameSubstring )",
- 1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
-
- osrfAppRegisterMethod(
- app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
- "Returns a complete list of methods. PARAMS()", 0,
- OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
-
- osrfAppRegisterMethod(
- app, OSRF_SYSMETHOD_ECHO, NULL,
- "Echos all data sent to the server back to the client. PARAMS([a, b, ...])", 0,
- OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING );
+static void register_system_methods( osrfApplication* app ) {
+
+ if( !app ) return;
+
+ register_method(
+ app, OSRF_SYSMETHOD_INTROSPECT, NULL,
+ "Return a list of methods whose names have the same initial "
+ "substring as that of the provided method name PARAMS( methodNameSubstring )",
+ 1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+ NULL );
+
+ register_method(
+ app, OSRF_SYSMETHOD_INTROSPECT, NULL,
+ "Return a list of methods whose names have the same initial "
+ "substring as that of the provided method name PARAMS( methodNameSubstring )",
+ 1, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+ NULL );
+
+ register_method(
+ app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
+ "Returns a complete list of methods. PARAMS()",
+ 0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+ NULL );
+
+ register_method(
+ app, OSRF_SYSMETHOD_INTROSPECT_ALL, NULL,
+ "Returns a complete list of methods. PARAMS()",
+ 0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+ NULL );
+
+ register_method(
+ app, OSRF_SYSMETHOD_ECHO, NULL,
+ "Echos all data sent to the server back to the client. PARAMS([a, b, ...])",
+ 0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING,
+ NULL );
+
+ register_method(
+ app, OSRF_SYSMETHOD_ECHO, NULL,
+ "Echos all data sent to the server back to the client. PARAMS([a, b, ...])",
+ 0, OSRF_METHOD_SYSTEM | OSRF_METHOD_STREAMING | OSRF_METHOD_ATOMIC,
+ NULL );
}
/**
"Adding responses to stash for method %s", ctx->method->name );
if( data ) {
- // If you want to flush the intput buffers for every output message,
- // this is the place to do it.
- //osrf_app_session_queue_wait( ctx->session, 0, NULL );
-
- // Create an OSRF message
- osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
- osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
- osrf_message_set_result( msg, data );
-
- // Serialize the OSRF message into JSON text
- char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
- osrfMessageFree( msg );
-
- // If the new message would overflow the buffer, flush the output buffer first
- int len_so_far = buffer_length( ctx->session->outbuf );
- if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
- if( flush_responses( ctx->session, ctx->session->outbuf ))
- return -1;
- }
+ char* data_str = jsonObjectToJSON(data); // free me (below)
+ size_t raw_size = strlen(data_str);
+ size_t extra_size = osrfXmlEscapingLength(data_str);
+ size_t data_size = raw_size + extra_size;
+ size_t chunk_size = ctx->method->max_chunk_size;
- // Append the JSON text to the output buffer
- append_msg( ctx->session->outbuf, json );
- free( json );
+ if (data_size > chunk_size) // calculate an escape-scaled chunk size
+ chunk_size = ((double)raw_size / (double)data_size) * (double)chunk_size;
+
+ if (chunk_size > 0 && chunk_size < raw_size) {
+ // chunking -- response message exceeds max message size.
+ // break it up into chunks for partial delivery
+
+ osrfSendChunkedResult(ctx->session, ctx->request,
+ data_str, raw_size, chunk_size);
+
+ } else {
+
+ // bundling -- message body (may be) too small for single
+ // delivery. prepare message for bundling.
+
+ // Create an OSRF message
+ osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+ osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+ osrf_message_set_result( msg, data );
+
+ // Serialize the OSRF message into JSON text
+ char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+ osrfMessageFree( msg );
+
+ // If the new message would overflow the buffer, flush the output buffer first
+ int len_so_far = buffer_length( ctx->session->outbuf );
+ if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->max_bundle_size )) {
+ if( flush_responses( ctx->session, ctx->session->outbuf ))
+ return -1;
+ }
+
+ // Append the JSON text to the output buffer
+ append_msg( ctx->session->outbuf, json );
+ free( json );
+ }
+
+ free(data_str);
}
if(complete) {
// Log the call, with the method and parameters
char* params_str = jsonObjectToJSON( ctx->params );
if( params_str ) {
- osrfLogInfo( OSRF_LOG_MARK, "CALL:\t%s %s - %s",
- ctx->session->remote_service, ctx->method->name, params_str );
+ // params_str will at minimum be "[]"
+ int i = 0;
+ const char* str;
+ char* method = ctx->method->name;
+ int redact_params = 0;
+ while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+ //osrfLogInternal(OSRF_LOG_MARK, "Checking for log protection [%s]", str);
+ if(!strncmp(method, str, strlen(str))) {
+ redact_params = 1;
+ break;
+ }
+ }
+
+ char* params_logged;
+ if(redact_params) {
+ params_logged = strdup("**PARAMS REDACTED**");
+ } else {
+ params_str[strlen(params_str) - 1] = '\0'; // drop the trailing ']'
+ params_logged = strdup(params_str + 1);
+ }
free( params_str );
+ osrfLogInfo( OSRF_LOG_MARK, "CALL: %s %s %s",
+ ctx->session->remote_service, ctx->method->name, params_logged);
+ free( params_logged );
}
return 0;
}
+
+/**
+ @brief Free an osrfMethod.
+ @param name Name of the method (not used).
+ @param p Void pointer pointing to the osrfMethod.
+
+ This function is designed to be installed as a callback for an osrfHash (hence the
+ unused @a name parameter and the void pointer).
+*/
+static void osrfMethodFree( char* name, void* p ) {
+ osrfMethod* method = p;
+ if( method ) {
+ free( method->name );
+ free( method->symbol );
+ free( method->notes );
+ free( method );
+ }
+}
+
+/**
+ @brief Free an osrfApplication
+ @param name Name of the application (not used).
+ @param p Void pointer pointing to the osrfApplication.
+
+ This function is designed to be installed as a callback for an osrfHash (hence the
+ unused @a name parameter and the void pointer).
+*/
+static void osrfAppFree( char* name, void* p ) {
+ osrfApplication* app = p;
+ if( app ) {
+ dlclose( app->handle );
+ osrfHashFree( app->methods );
+ free( app );
+ }
+}