2 @file osrf_app_session.c
3 @brief Implementation of osrfAppSession.
7 #include "opensrf/osrf_app_session.h"
9 struct osrf_app_request_struct {
10 /** Our controlling session. */
11 struct osrf_app_session_struct* session;
15 /** True if we have received a 'request complete' message from our request. */
17 /** Our original request payload. */
19 /** List of responses to our request. */
22 /** Boolean; if true, then a call that is waiting on a response, will reset the
23 timeout and set this variable back to false. */
26 typedef struct osrf_app_request_struct osrfAppRequest;
28 /** Send the given message */
29 static int _osrf_app_session_send( osrfAppSession*, osrfMessage* msg );
31 static int osrfAppSessionMakeLocaleRequest(
32 osrfAppSession* session, const jsonObject* params, const char* method_name,
33 int protocol, osrfStringArray* param_strings, char* locale );
35 /* the global app_session cache */
36 static osrfHash* osrfAppSessionCache = NULL;
38 // --------------------------------------------------------------------------
39 // --------------------------------------------------------------------------
41 // --------------------------------------------------------------------------
43 /** Allocates and initializes a new app_request object */
44 static osrfAppRequest* _osrf_app_request_init(
45 osrfAppSession* session, osrfMessage* msg ) {
48 (osrfAppRequest*) safe_malloc(sizeof(osrfAppRequest));
50 req->session = session;
51 req->request_id = msg->thread_trace;
55 req->reset_timeout = 0;
62 void osrfAppSessionCleanup() {
63 osrfHashFree(osrfAppSessionCache);
64 osrfAppSessionCache = NULL;
67 /** Frees memory used by an app_request object */
68 static void _osrf_app_request_free( void * req ){
69 if( req == NULL ) return;
70 osrfAppRequest* r = (osrfAppRequest*) req;
71 if( r->payload ) osrfMessageFree( r->payload );
73 /* Free the messages in the result queue */
75 osrfMessage* next_msg;
77 next_msg = r->result->next;
78 osrfMessageFree( r->result );
85 /** Pushes the given message onto the list of 'responses' to this request */
86 static void _osrf_app_request_push_queue( osrfAppRequest* req, osrfMessage* result ){
87 if(req == NULL || result == NULL) return;
88 osrfLogDebug( OSRF_LOG_MARK, "App Session pushing request [%d] onto request queue", result->thread_trace );
89 if(req->result == NULL) {
94 osrfMessage* ptr = req->result;
95 osrfMessage* ptr2 = req->result->next;
104 /** Removes this app_request from our session request set */
105 void osrf_app_session_request_finish(
106 osrfAppSession* session, int req_id ){
108 if(session == NULL) return;
109 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
110 if(req == NULL) return;
111 osrfListRemove( req->session->request_queue, req->request_id );
115 void osrf_app_session_request_reset_timeout( osrfAppSession* session, int req_id ) {
116 if(session == NULL) return;
117 osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
118 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
119 if(req == NULL) return;
120 req->reset_timeout = 1;
124 Checks the receive queue for messages. If any are found, the first
125 is popped off and returned. Otherwise, this method will wait at most timeout
126 seconds for a message to appear in the receive queue. Once it arrives it is returned.
127 If no messages arrive in the timeout provided, null is returned.
129 static osrfMessage* _osrf_app_request_recv( osrfAppRequest* req, int timeout ) {
131 if(req == NULL) return NULL;
133 if( req->result != NULL ) {
134 /* pop off the first message in the list */
135 osrfMessage* tmp_msg = req->result;
136 req->result = req->result->next;
140 time_t start = time(NULL);
141 time_t remaining = (time_t) timeout;
143 while( remaining >= 0 ) {
144 /* tell the session to wait for stuff */
145 osrfLogDebug( OSRF_LOG_MARK, "In app_request receive with remaining time [%d]", (int) remaining );
147 osrf_app_session_queue_wait( req->session, 0, NULL );
148 if(req->session->transport_error) {
149 osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
153 if( req->result != NULL ) { /* if we received anything */
154 /* pop off the first message in the list */
155 osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it");
156 osrfMessage* ret_msg = req->result;
157 req->result = ret_msg->next;
158 if (ret_msg->sender_locale)
159 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
167 osrf_app_session_queue_wait( req->session, (int) remaining, NULL );
169 if(req->session->transport_error) {
170 osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
174 if( req->result != NULL ) { /* if we received anything */
175 /* pop off the first message in the list */
176 osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it");
177 osrfMessage* ret_msg = req->result;
178 req->result = ret_msg->next;
179 if (ret_msg->sender_locale)
180 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
187 if(req->reset_timeout) {
188 remaining = (time_t) timeout;
189 req->reset_timeout = 0;
190 osrfLogDebug( OSRF_LOG_MARK, "Received a timeout reset");
192 remaining -= (int) (time(NULL) - start);
196 char* paramString = jsonObjectToJSON(req->payload->_params);
197 osrfLogInfo( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout: %s %s",
198 req->payload->method_name, paramString);
204 /** Resend this requests original request message */
205 static int _osrf_app_request_resend( osrfAppRequest* req ) {
206 if(req == NULL) return 0;
208 osrfLogDebug( OSRF_LOG_MARK, "Resending request [%d]", req->request_id );
209 return _osrf_app_session_send( req->session, req->payload );
216 // --------------------------------------------------------------------------
217 // --------------------------------------------------------------------------
219 // --------------------------------------------------------------------------
221 /** Install a locale for the session */
222 char* osrf_app_session_set_locale( osrfAppSession* session, const char* locale ) {
223 if (!session || !locale)
226 if(session->session_locale) {
227 if( strlen(session->session_locale) >= strlen(locale) ) {
228 /* There's room available; just copy */
229 strcpy(session->session_locale, locale);
231 free(session->session_locale);
232 session->session_locale = strdup( locale );
235 session->session_locale = strdup( locale );
238 return session->session_locale;
241 /** returns a session from the global session hash */
242 osrfAppSession* osrf_app_session_find_session( const char* session_id ) {
243 if(session_id) return osrfHashGet(osrfAppSessionCache, session_id);
248 /** adds a session to the global session cache */
249 static void _osrf_app_session_push_session( osrfAppSession* session ) {
251 if( osrfAppSessionCache == NULL ) osrfAppSessionCache = osrfNewHash();
252 if( osrfHashGet( osrfAppSessionCache, session->session_id ) ) return;
253 osrfHashSet( osrfAppSessionCache, session, session->session_id );
256 /** Allocates and initializes a new app_session */
258 osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
260 if (!remote_service) {
261 osrfLogWarning( OSRF_LOG_MARK, "No remote service specified in osrfAppSessionClientInit");
265 osrfAppSession* session = safe_malloc(sizeof(osrfAppSession));
267 session->transport_handle = osrfSystemGetTransportClient();
268 if( session->transport_handle == NULL ) {
269 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
274 osrfStringArray* arr = osrfNewStringArray(8);
275 osrfConfigGetValueList(NULL, arr, "/domain");
276 const char* domain = osrfStringArrayGetString(arr, 0);
279 osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file");
281 osrfStringArrayFree(arr);
285 char* router_name = osrfConfigGetValue(NULL, "/router_name");
287 osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file");
289 osrfStringArrayFree(arr);
293 char target_buf[512];
294 target_buf[ 0 ] = '\0';
296 int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s",
297 router_name ? router_name : "(null)",
298 domain ? domain : "(null)",
299 remote_service ? remote_service : "(null)" );
300 osrfStringArrayFree(arr);
304 if( len >= sizeof( target_buf ) ) {
305 osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id");
310 session->request_queue = osrfNewList();
311 session->request_queue->freeItem = &_osrf_app_request_free;
312 session->remote_id = strdup(target_buf);
313 session->orig_remote_id = strdup(session->remote_id);
314 session->remote_service = strdup(remote_service);
315 session->session_locale = NULL;
316 session->transport_error = 0;
318 #ifdef ASSUME_STATELESS
319 session->stateless = 1;
320 osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
322 session->stateless = 0;
323 osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
326 /* build a chunky, random session id */
329 snprintf(id, sizeof(id), "%f.%d%ld", get_timestamp_millis(), (int)time(NULL), (long) getpid());
330 session->session_id = strdup(id);
331 osrfLogDebug( OSRF_LOG_MARK, "Building a new client session with id [%s] [%s]",
332 session->remote_service, session->session_id );
334 session->thread_trace = 0;
335 session->state = OSRF_SESSION_DISCONNECTED;
336 session->type = OSRF_SESSION_CLIENT;
337 //session->next = NULL;
339 session->userData = NULL;
340 session->userDataFree = NULL;
342 _osrf_app_session_push_session( session );
346 osrfAppSession* osrf_app_server_session_init(
347 const char* session_id, const char* our_app, const char* remote_id ) {
349 osrfLogDebug( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
350 " and remote_id %s", session_id, our_app, remote_id );
352 osrfAppSession* session = osrf_app_session_find_session( session_id );
353 if(session) return session;
355 session = safe_malloc(sizeof(osrfAppSession));
357 session->transport_handle = osrfSystemGetTransportClient();
358 if( session->transport_handle == NULL ) {
359 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
365 char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
366 if(statel) stateless = atoi(statel);
370 session->request_queue = osrfNewList();
371 session->request_queue->freeItem = &_osrf_app_request_free;
372 session->remote_id = strdup(remote_id);
373 session->orig_remote_id = strdup(remote_id);
374 session->session_id = strdup(session_id);
375 session->remote_service = strdup(our_app);
376 session->stateless = stateless;
378 #ifdef ASSUME_STATELESS
379 session->stateless = 1;
381 session->stateless = 0;
384 session->thread_trace = 0;
385 session->state = OSRF_SESSION_DISCONNECTED;
386 session->type = OSRF_SESSION_SERVER;
387 session->session_locale = NULL;
389 session->userData = NULL;
390 session->userDataFree = NULL;
392 _osrf_app_session_push_session( session );
397 int osrfAppSessionMakeRequest(
398 osrfAppSession* session, const jsonObject* params,
399 const char* method_name, int protocol, osrfStringArray* param_strings ) {
401 return osrfAppSessionMakeLocaleRequest( session, params,
402 method_name, protocol, param_strings, NULL );
405 static int osrfAppSessionMakeLocaleRequest(
406 osrfAppSession* session, const jsonObject* params, const char* method_name,
407 int protocol, osrfStringArray* param_strings, char* locale ) {
409 if(session == NULL) return -1;
413 osrfMessage* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
414 osrf_message_set_method(req_msg, method_name);
417 osrf_message_set_locale(req_msg, locale);
418 } else if (session->session_locale) {
419 osrf_message_set_locale(req_msg, session->session_locale);
423 osrf_message_set_params(req_msg, params);
429 for(i = 0; i!= param_strings->size ; i++ ) {
430 osrf_message_add_param(req_msg,
431 osrfStringArrayGetString(param_strings,i));
436 osrfAppRequest* req = _osrf_app_request_init( session, req_msg );
437 if(_osrf_app_session_send( session, req_msg ) ) {
438 osrfLogWarning( OSRF_LOG_MARK, "Error sending request message [%d]", session->thread_trace );
439 _osrf_app_request_free(req);
443 osrfLogDebug( OSRF_LOG_MARK, "Pushing [%d] onto request queue for session [%s] [%s]",
444 req->request_id, session->remote_service, session->session_id );
445 osrfListSet( session->request_queue, req, req->request_id );
446 return req->request_id;
449 void osrf_app_session_set_complete( osrfAppSession* session, int request_id ) {
453 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
454 if(req) req->complete = 1;
457 int osrf_app_session_request_complete( const osrfAppSession* session, int request_id ) {
460 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
462 return req->complete;
467 /** Resets the remote connection id to that of the original*/
468 void osrf_app_session_reset_remote( osrfAppSession* session ){
472 osrfLogDebug( OSRF_LOG_MARK, "App Session [%s] [%s] resetting remote id to %s",
473 session->remote_service, session->session_id, session->orig_remote_id );
475 osrf_app_session_set_remote( session, session->orig_remote_id );
478 void osrf_app_session_set_remote( osrfAppSession* session, const char* remote_id ) {
482 if( session->remote_id ) {
483 if( strlen(session->remote_id) >= strlen(remote_id) ) {
484 // There's enough room; just copy it
485 strcpy(session->remote_id, remote_id);
487 free(session->remote_id );
488 session->remote_id = strdup( remote_id );
491 session->remote_id = strdup( remote_id );
495 pushes the given message into the result list of the app_request
496 with the given request_id
498 int osrf_app_session_push_queue(
499 osrfAppSession* session, osrfMessage* msg ){
500 if(session == NULL || msg == NULL) return 0;
502 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
503 if(req == NULL) return 0;
504 _osrf_app_request_push_queue( req, msg );
509 /** Attempts to connect to the remote service */
510 int osrfAppSessionConnect( osrfAppSession* session ) {
515 if(session->state == OSRF_SESSION_CONNECTED) {
519 int timeout = 5; /* XXX CONFIG VALUE */
521 osrfLogDebug( OSRF_LOG_MARK, "AppSession connecting to %s", session->remote_id );
523 /* defaulting to protocol 1 for now */
524 osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
525 osrf_app_session_reset_remote( session );
526 session->state = OSRF_SESSION_CONNECTING;
527 int ret = _osrf_app_session_send( session, con_msg );
528 osrfMessageFree(con_msg);
532 time_t start = time(NULL);
533 time_t remaining = (time_t) timeout;
535 while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
536 osrf_app_session_queue_wait( session, remaining, NULL );
537 if(session->transport_error) {
538 osrfLogError(OSRF_LOG_MARK, "cannot communicate with %s", session->remote_service);
541 remaining -= (int) (time(NULL) - start);
544 if(session->state == OSRF_SESSION_CONNECTED)
545 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
547 if(session->state != OSRF_SESSION_CONNECTED)
555 /** Disconnects from the remote service */
556 int osrf_app_session_disconnect( osrfAppSession* session){
560 if(session->state == OSRF_SESSION_DISCONNECTED)
563 if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
564 osrfLogDebug( OSRF_LOG_MARK,
565 "Exiting disconnect on stateless session %s",
566 session->session_id);
570 osrfLogDebug(OSRF_LOG_MARK, "AppSession disconnecting from %s", session->remote_id );
572 osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
573 _osrf_app_session_send( session, dis_msg );
574 session->state = OSRF_SESSION_DISCONNECTED;
576 osrfMessageFree( dis_msg );
577 osrf_app_session_reset_remote( session );
581 int osrf_app_session_request_resend( osrfAppSession* session, int req_id ) {
582 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
583 return _osrf_app_request_resend( req );
587 static int osrfAppSessionSendBatch( osrfAppSession* session, osrfMessage* msgs[], int size ) {
589 if( !(session && msgs && size > 0) ) return 0;
592 osrfMessage* msg = msgs[0];
596 osrf_app_session_queue_wait( session, 0, NULL );
598 if(session->state != OSRF_SESSION_CONNECTED) {
600 if(session->stateless) { /* stateless session always send to the root listener */
601 osrf_app_session_reset_remote(session);
605 /* do an auto-connect if necessary */
606 if( ! session->stateless &&
607 (msg->m_type != CONNECT) &&
608 (msg->m_type != DISCONNECT) &&
609 (session->state != OSRF_SESSION_CONNECTED) ) {
611 if(!osrfAppSessionConnect( session ))
618 char* string = osrfMessageSerializeBatch(msgs, size);
622 transport_message* t_msg = message_init(
623 string, "", session->session_id, session->remote_id, NULL );
624 message_set_osrf_xid( t_msg, osrfLogGetXid() );
626 retval = client_send_message( session->transport_handle, t_msg );
628 if( retval ) osrfLogError(OSRF_LOG_MARK, "client_send_message failed");
630 osrfLogInfo(OSRF_LOG_MARK, "[%s] sent %d bytes of data to %s",
631 session->remote_service, strlen(string), t_msg->recipient );
633 osrfLogDebug(OSRF_LOG_MARK, "Sent: %s", string );
636 message_free( t_msg );
644 static int _osrf_app_session_send( osrfAppSession* session, osrfMessage* msg ){
645 if( !(session && msg) ) return 0;
648 return osrfAppSessionSendBatch( session, a, 1 );
653 Waits up to 'timeout' seconds for some data to arrive.
654 Any data that arrives will be processed according to its
655 payload and message type. This method will return after
656 any data has arrived.
658 int osrf_app_session_queue_wait( osrfAppSession* session, int timeout, int* recvd ){
659 if(session == NULL) return 0;
660 osrfLogDebug(OSRF_LOG_MARK, "AppSession in queue_wait with timeout %d", timeout );
661 return osrf_stack_entry_point(session->transport_handle, timeout, recvd);
664 /** Disconnects (if client) and removes the given session from the global session cache
665 ! This frees all attached app_requests !
667 void osrfAppSessionFree( osrfAppSession* session ){
668 if(session == NULL) return;
672 osrfLogDebug(OSRF_LOG_MARK, "AppSession [%s] [%s] destroying self and deleting requests",
673 session->remote_service, session->session_id );
674 if(session->type == OSRF_SESSION_CLIENT
675 && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
676 osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
677 _osrf_app_session_send( session, dis_msg );
678 osrfMessageFree(dis_msg);
681 /* Remove self from the global session cache */
683 osrfHashRemove( osrfAppSessionCache, session->session_id );
685 /* Free the memory */
687 if( session->userDataFree && session->userData )
688 session->userDataFree(session->userData);
690 if(session->session_locale)
691 free(session->session_locale);
693 free(session->remote_id);
694 free(session->orig_remote_id);
695 free(session->session_id);
696 free(session->remote_service);
697 osrfListFree(session->request_queue);
701 osrfMessage* osrfAppSessionRequestRecv(
702 osrfAppSession* session, int req_id, int timeout ) {
703 if(req_id < 0 || session == NULL)
705 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
706 return _osrf_app_request_recv( req, timeout );
711 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, const jsonObject* data ) {
712 if(!ses || ! data ) return -1;
714 osrfMessage* msg = osrf_message_init( RESULT, requestId, 1 );
715 osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
716 char* json = jsonObjectToJSON( data );
718 osrf_message_set_result_content( msg, json );
719 _osrf_app_session_send( ses, msg );
722 osrfMessageFree( msg );
728 int osrfAppRequestRespondComplete(
729 osrfAppSession* ses, int requestId, const jsonObject* data ) {
731 osrfMessage* status = osrf_message_init( STATUS, requestId, 1);
732 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
735 osrfMessage* payload = osrf_message_init( RESULT, requestId, 1 );
736 osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
738 char* json = jsonObjectToJSON( data );
739 osrf_message_set_result_content( payload, json );
746 osrfAppSessionSendBatch( ses, ms, 2 );
748 osrfMessageFree( payload );
750 osrfAppSessionSendBatch( ses, &status, 1 );
753 osrfMessageFree( status );
758 int osrfAppSessionStatus( osrfAppSession* ses, int type,
759 const char* name, int reqId, const char* message ) {
762 osrfMessage* msg = osrf_message_init( STATUS, reqId, 1);
763 osrf_message_set_status_info( msg, name, message, type );
764 _osrf_app_session_send( ses, msg );
765 osrfMessageFree( msg );