int max_requests; /**< How many requests a child processes before terminating. */
int min_children; /**< Minimum number of children to maintain. */
int max_children; /**< Maximum number of children to maintain. */
+ int max_backlog_queue; /**< Maximum size of backlog queue. */
int fd; /**< Unused. */
int data_to_child; /**< Unused. */
int data_to_parent; /**< Unused. */
static volatile sig_atomic_t child_dead;
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
- int max_requests, int min_children, int max_children );
+ int max_requests, int min_children, int max_children, int max_backlog_queue );
static prefork_child* launch_child( prefork_simple* forker );
static void prefork_launch_children( prefork_simple* forker );
static void prefork_run( prefork_simple* forker );
int maxr = 1000;
int maxc = 10;
+ int maxbq = 1000;
int minc = 3;
int kalive = 5;
char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
+ char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
if( !keepalive )
else
maxc = atoi( max_children );
+ if( !max_backlog_queue )
+ osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
+ else
+ maxbq = atoi( max_backlog_queue );
+
free( keepalive );
free( max_req );
free( min_children );
free( max_children );
+ free( max_backlog_queue );
/* --------------------------------------------------- */
char* resc = va_list_to_string( "%s_listener", appname );
prefork_simple forker;
- if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
+ if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
osrfLogError( OSRF_LOG_MARK,
"osrf_prefork_run() failed to create prefork_simple object" );
return -1;
before terminating.
@param min_children Minimum number of child processes to maintain.
@param max_children Maximum number of child processes to maintain.
+ @param max_backlog_queue Maximum size of backlog queue.
@return 0 if successful, or 1 if not (due to invalid parameters).
*/
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
- int max_requests, int min_children, int max_children ) {
+ int max_requests, int min_children, int max_children, int max_backlog_queue ) {
if( min_children > max_children ) {
osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
prefork->max_requests = max_requests;
prefork->min_children = min_children;
prefork->max_children = max_children;
+ prefork->max_backlog_queue = max_backlog_queue;
prefork->fd = 0;
prefork->data_to_child = 0;
prefork->data_to_parent = 0;
transport_message* cur_msg = NULL;
+ // The backlog queue accumulates messages received while there
+ // are not yet children available to process them. While the
+ // transport client maintains its own queue of messages, sweeping
+ // the transport client's queue in the backlog queue gives us the
+ // ability to set a limit on the size of the backlog queue (and
+ // then to drop messages once the backlog queue has filled up)
+ transport_message* backlog_queue_head = NULL;
+ transport_message* backlog_queue_tail = NULL;
+ int backlog_queue_size = 0;
+
while( 1 ) {
if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
return;
}
- // Wait indefinitely for an input message
- osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
- cur_msg = client_recv( forker->connection, -1 );
+ int received_from_network = 0;
+ if ( backlog_queue_size == 0 ) {
+ // Wait indefinitely for an input message
+ osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+ cur_msg = client_recv( forker->connection, -1 );
+ received_from_network = 1;
+ } else {
+ // See if any messages are immediately available
+ cur_msg = client_recv( forker->connection, 0 );
+ if ( cur_msg != NULL )
+ received_from_network = 1;
+ }
- if( cur_msg == NULL ) {
- // most likely a signal was received. clean up any recently
- // deceased children and try again.
- if(child_dead)
- reap_children(forker);
- continue;
- }
+ if (received_from_network) {
+ if( cur_msg == NULL ) {
+ // most likely a signal was received. clean up any recently
+ // deceased children and try again.
+ if(child_dead)
+ reap_children(forker);
+ continue;
+ }
- if (cur_msg->error_type) {
- osrfLogInfo(OSRF_LOG_MARK,
- "Listener received an XMPP error message. "
- "Likely a bounced message. sender=%s", cur_msg->sender);
- if(child_dead)
- reap_children(forker);
- continue;
- }
+ if (cur_msg->error_type) {
+ osrfLogInfo(OSRF_LOG_MARK,
+ "Listener received an XMPP error message. "
+ "Likely a bounced message. sender=%s", cur_msg->sender);
+ if(child_dead)
+ reap_children(forker);
+ continue;
+ }
- message_prepare_xml( cur_msg );
- const char* msg_data = cur_msg->msg_xml;
- if( ! msg_data || ! *msg_data ) {
- osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
- (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
- message_free( cur_msg );
- continue; // Message not usable; go on to the next one.
+ message_prepare_xml( cur_msg );
+ const char* msg_data = cur_msg->msg_xml;
+ if( ! msg_data || ! *msg_data ) {
+ osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+ (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+ message_free( cur_msg );
+ continue; // Message not usable; go on to the next one.
+ }
+
+ // stick message onto queue
+ cur_msg->next = NULL;
+ if (backlog_queue_size == 0) {
+ backlog_queue_head = cur_msg;
+ backlog_queue_tail = cur_msg;
+ } else {
+ if (backlog_queue_size >= forker->max_backlog_queue) {
+ osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
+ "latest message",
+ forker->max_backlog_queue );
+ osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
+ osrf_message_set_status_info( err, "osrfMethodException",
+ "Service unavailable: no available children and backlog queue at limit",
+ OSRF_STATUS_SERVICEUNAVAILABLE );
+ char *data = osrf_message_serialize( err );
+ osrfMessageFree( err );
+ transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
+ message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
+ free( data );
+ transport_client* client = osrfSystemGetTransportClient();
+ client_send_message( client, tresponse );
+ message_free( tresponse );
+ message_free(cur_msg);
+ continue;
+ }
+ backlog_queue_tail->next = cur_msg;
+ backlog_queue_tail = cur_msg;
+ osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
+ }
+ backlog_queue_size++;
}
+ if (backlog_queue_size == 0) {
+ // strictly speaking, this check may be redundant, but
+ // from this point forward we can be sure that the
+ // backlog queue has at least one message in it and
+ // that if we can find a child to process it, we want to
+ // process the head of that queue.
+ continue;
+ }
+
+ cur_msg = backlog_queue_head;
+
int honored = 0; /* will be set to true when we service the request */
int no_recheck = 0;
while( ! honored ) {
- if( !no_recheck ) {
- if(check_children( forker, 0 ) < 0) {
+ if( !no_recheck ) {
+ if(check_children( forker, 0 ) < 0) {
continue; // check failed, try again
- }
+ }
}
no_recheck = 0;
osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
cur_child->write_data_fd );
+ const char* msg_data = cur_msg->msg_xml;
int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
// This child appears to be dead or unusable. Discard it.
osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
new_child->write_data_fd, new_child->pid );
+ const char* msg_data = cur_msg->msg_xml;
int written = write(
new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
}
}
- if( !honored ) {
- osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
- if( check_children( forker, 1 ) >= 0 ) {
- // Tell the loop not to call check_children again, since we just successfully called it
- no_recheck = 1;
- }
- }
-
if( child_dead )
reap_children( forker );
+ if( !honored ) {
+ break;
+ }
+
} // end while( ! honored )
- message_free( cur_msg );
+ if ( honored ) {
+ backlog_queue_head = cur_msg->next;
+ backlog_queue_size--;
+ cur_msg->next = NULL;
+ message_free( cur_msg );
+ }
} /* end top level listen loop */
}