1 /* --------------------------------------------------------------------
2 * Copyright (C) 2018 King County Library Service
3 * Bill Erickson <berickxx@gmail.com>
5 * Code borrows heavily from osrf_websocket_translator.c
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License
9 * as published by the Free Software Foundation; either version 2
10 * of the License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 --------------------------------------------------------------------- */
19 * OpenSRF Websockets Relay
21 * Reads Websockets requests on STDIN
22 * Sends replies to requests on STDOUT
24 * Built to function with websocketd:
25 * https://github.com/joewalnes/websocketd
29 * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml &
37 #include <opensrf/utils.h>
38 #include <opensrf/osrf_hash.h>
39 #include <opensrf/transport_client.h>
40 #include <opensrf/osrf_message.h>
41 #include <opensrf/osrf_app_session.h>
42 #include <opensrf/log.h>
44 #define MAX_THREAD_SIZE 64
45 #define RECIP_BUF_SIZE 256
46 #define WEBSOCKET_INGRESS "ws-translator-v2"
48 // maximun number of active, CONNECTed opensrf sessions allowed. in
49 // practice, this number will be very small, rarely reaching double
50 // digits. This is just a security back-stop. A client trying to open
51 // this many connections is almost certainly attempting to DOS the
53 #define MAX_ACTIVE_STATEFUL_SESSIONS 64
55 // Message exceeding this size are discarded.
56 // This value must be greater than RESET_MESSAGE_SIZE (below)
58 #define MAX_MESSAGE_SIZE 10485760
60 // After processing any message this size or larger, free and
61 // recreate the stdin buffer to release the memory.
63 #define RESET_MESSAGE_SIZE 102400
65 // After receiving the initial shutdow call, wake the event loop every
66 // SHUTDOWN_POLL_INTERVAL_SECONDS to see if we can shut down.
67 #define SHUTDOWN_POLL_INTERVAL_SECONDS 1
69 // Attempt to gracefully disconnect the client until
70 // SHUTDOWN_MAX_GRACEFUL_SECONDS has passed without a shutdown
71 // opportunity, at which point force-close the connection.
72 #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
74 // Incremented with every REQUEST, decremented with every COMPLETE.
75 static int requests_in_flight = 0;
77 // default values, replaced during setup (below) as needed.
78 static char* config_file = "/openils/conf/opensrf_core.xml";
79 static char* config_ctxt = "gateway";
80 static char* osrf_router = NULL;
81 static char* osrf_domain = NULL;
83 // Cache of opensrf thread strings and back-end receipients.
84 // Tracking this here means the caller only needs to track the thread.
85 // It also means we don't have to expose internal XMPP IDs
86 static osrfHash* stateful_session_cache = NULL;
87 // Message on STDIN go into our reusable buffer
88 static growing_buffer* stdin_buf = NULL;
89 // OpenSRF XMPP connection handle
90 static transport_client* osrf_handle = NULL;
91 // Reusable string buf for recipient addresses
92 static char recipient_buf[RECIP_BUF_SIZE];
93 // Websocket client IP address (for logging)
94 static char* client_ip = NULL;
96 static void rebuild_stdin_buffer();
97 static void child_init(int argc, char* argv[]);
98 static void read_from_stdin();
99 static void relay_stdin_message(const char*);
100 static char* extract_inbound_messages();
101 static void log_request(const char*, osrfMessage*);
102 static void read_from_osrf();
103 static void read_one_osrf_message(transport_message*);
104 static int shut_it_down(int);
105 static void release_hash_string(char*, void*);
106 static int can_shutdown_gracefully();
108 // Websocketd closes STDIN on shutdown, followed by SIGTERM.
109 // Signal the back-ends it's time for graceful shutdown by
110 // sending a SIGUSER1 to the backend processes (or parent
111 // process group). Websocket ignores SIGUSR1.
112 static time_t shutdown_requested = 0;
113 static void sigusr1_handler(int sig) {
114 signal(SIGUSR1, sigusr1_handler);
115 osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 -- graceful shutdown");
116 shutdown_requested = time(NULL);
119 int main(int argc, char* argv[]) {
121 // Handle shutdown signal -- only needed once.
122 signal(SIGUSR1, sigusr1_handler);
124 // Connect to OpenSR -- exits on error
125 child_init(argc, argv);
127 // Disable output buffering.
128 setbuf(stdout, NULL);
129 rebuild_stdin_buffer();
131 // The main loop waits for data to be available on both STDIN
132 // (websocket client request) and the OpenSRF XMPP socket
133 // (replies returning to the websocket client).
135 int stdin_no = fileno(stdin);
136 int osrf_no = osrf_handle->session->sock_id;
137 int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
144 FD_SET(osrf_no, &fds);
145 FD_SET(stdin_no, &fds);
147 if (shutdown_requested) {
151 tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS;
153 // Wait indefinitely for activity to process
154 sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
158 // Wait indefinitely for activity to process.
159 // This will be interrupted during a shutdown request signal.
160 sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
163 if (sel_resp < 0) { // error
165 if (errno == EINTR) {
166 // Interrupted by a signal. Start the loop over.
167 // Could be a SIGNUSR1 shutdown request.
171 osrfLogError(OSRF_LOG_MARK,
172 "WS select() failed with [%s]. Exiting", strerror(errno));
179 if (FD_ISSET(stdin_no, &fds)) {
183 if (FD_ISSET(osrf_no, &fds)) {
188 if (shutdown_requested) {
189 shutdown_stat = can_shutdown_gracefully();
191 if (shutdown_stat == 0) {
192 // continue graceful shutdown cycle
196 // graceful shutdown cycle has completed either successfully
198 return shut_it_down(shutdown_stat > 0 ? 0 : 1);
202 return shut_it_down(0);
205 // Returns 1 if graceful shutdown is OK.
206 // Returns 0 if graceful shutdown cycle should continue.
207 // Returns -1 if the graceful shutdown cycle timed out.
208 static int can_shutdown_gracefully() {
210 time_t cycle_time = time(NULL) - shutdown_requested;
211 if (cycle_time > SHUTDOWN_MAX_GRACEFUL_SECONDS) {
212 osrfLogWarning(OSRF_LOG_MARK, "Timeout during graceful shutdown");
216 unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
217 if (active_sessions == 0 && requests_in_flight == 0) {
218 osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
222 osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with "
223 "sessions=%d requests=%d", active_sessions, requests_in_flight);
228 static void rebuild_stdin_buffer() {
230 if (stdin_buf != NULL) {
231 buffer_free(stdin_buf);
234 stdin_buf = buffer_init(1024);
237 static int shut_it_down(int stat) {
238 osrfHashFree(stateful_session_cache);
239 buffer_free(stdin_buf);
240 osrf_system_shutdown(); // clean XMPP disconnect
246 // Connect to OpenSRF/XMPP
247 // Apply settings and command line args.
248 static void child_init(int argc, char* argv[]) {
251 config_file = argv[1];
254 if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
255 fprintf(stderr, "Cannot boostrap OSRF\n");
259 osrf_handle = osrfSystemGetTransportClient();
260 osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
262 osrf_router = osrfConfigGetValue(NULL, "/router_name");
263 osrf_domain = osrfConfigGetValue(NULL, "/domain");
265 stateful_session_cache = osrfNewHash();
266 osrfHashSetCallback(stateful_session_cache, release_hash_string);
268 client_ip = getenv("REMOTE_ADDR");
269 osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
272 // Called by osrfHash when a string is removed. We strdup each
273 // string before it goes into the hash.
274 static void release_hash_string(char* key, void* str) {
275 if (str == NULL) return;
280 // Relay websocket client messages from STDIN to OpenSRF. Reads one
281 // message then returns, allowing responses to intermingle with long
282 // series of requests.
283 static void read_from_stdin() {
287 // Read one char at a time so we can stop at the first newline
288 // and leave any other data on the wire until read_from_stdin()
292 int stat = read(fileno(stdin), char_buf, 1);
296 if (errno == EAGAIN) {
297 // read interrupted. Return to main loop to resume.
298 // Returning here will leave any in-progress message in
299 // the stdin_buf. We return to the main select loop
300 // to confirm we really have more data to read and to
301 // perform additional error checking on the stream.
305 // All other errors reading STDIN are considered fatal.
306 osrfLogError(OSRF_LOG_MARK,
307 "WS STDIN read failed with [%s]. Exiting", strerror(errno));
312 if (stat == 0) { // EOF
313 osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
320 if (c == '\n') { // end of current message
322 if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
323 osrfLogError(OSRF_LOG_MARK,
324 "WS message exceeded MAX_MESSAGE_SIZE, discarding");
325 rebuild_stdin_buffer();
329 if (stdin_buf->n_used > 0) {
330 relay_stdin_message(stdin_buf->buf);
332 if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
333 // Current message is large. Rebuild the buffer
334 // to free the excess memory.
335 rebuild_stdin_buffer();
339 // Reset the buffer and carry on.
340 buffer_reset(stdin_buf);
348 if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
349 // Message exceeds max message size. Continue reading
350 // and discarding data. NOTE: don't reset stdin_buf
351 // here becase we check n_used again once reading is done.
355 // Add the char to our current message buffer
356 buffer_add_char(stdin_buf, c);
361 // Relays a single websocket request to the OpenSRF/XMPP network.
362 static void relay_stdin_message(const char* msg_string) {
364 jsonObject *msg_wrapper = NULL; // free me
365 const jsonObject *tmp_obj = NULL;
366 const jsonObject *osrf_msg = NULL;
367 const char *service = NULL;
368 const char *thread = NULL;
369 const char *log_xid = NULL;
370 char *msg_body = NULL;
371 char *recipient = NULL;
373 // generate a new log trace for this request. it
374 // may be replaced by a client-provided trace below.
377 osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
379 msg_wrapper = jsonParse(msg_string);
381 if (msg_wrapper == NULL) {
382 osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
386 osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
388 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
389 service = jsonObjectGetString(tmp_obj);
391 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
392 thread = jsonObjectGetString(tmp_obj);
394 if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
395 log_xid = jsonObjectGetString(tmp_obj);
399 // use the caller-provide log trace id
400 if (strlen(log_xid) > MAX_THREAD_SIZE) {
401 osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
405 osrfLogForceXid(log_xid);
410 if (strlen(thread) > MAX_THREAD_SIZE) {
411 osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
415 // since clients can provide their own threads at session start time,
416 // the presence of a thread does not guarantee a cached recipient
417 recipient = (char*) osrfHashGet(stateful_session_cache, thread);
420 osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
427 int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
428 "%s@%s/%s", osrf_router, osrf_domain, service);
429 recipient_buf[size] = '\0';
430 recipient = recipient_buf;
433 osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
438 osrfLogDebug(OSRF_LOG_MARK,
439 "WS relaying message to opensrf thread=%s, recipient=%s",
442 // 'recipient' will be freed in extract_inbound_messages
443 // during a DISCONNECT call. Retain a local copy.
444 recipient = strdup(recipient);
446 msg_body = extract_inbound_messages(service, thread, osrf_msg);
448 osrfLogInternal(OSRF_LOG_MARK,
449 "WS relaying inbound message: %s", msg_body);
451 transport_message *tmsg = message_init(
452 msg_body, NULL, thread, recipient, NULL);
456 message_set_osrf_xid(tmsg, osrfLogGetXid());
458 if (client_send_message(osrf_handle, tmsg) != 0) {
459 osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
465 jsonObjectFree(msg_wrapper);
469 // Turn the OpenSRF message JSON into a set of osrfMessage's for
470 // analysis, ingress application, and logging.
471 static char* extract_inbound_messages(
472 const char* service, const char* thread, const jsonObject *osrf_msg) {
475 int num_msgs = osrf_msg->size;
477 osrfMessage* msg_list[num_msgs];
479 // here we do an extra json round-trip to get the data
480 // in a form osrf_message_deserialize can understand
481 // TODO: consider a version of osrf_message_init which can
482 // accept a jsonObject* instead of a JSON string.
483 char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
484 osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
487 // should we require the caller to always pass the service?
488 if (service == NULL) service = "";
490 for (i = 0; i < num_msgs; i++) {
492 osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
494 switch (msg->m_type) {
500 log_request(service, msg);
501 requests_in_flight++;
505 osrfHashRemove(stateful_session_cache, thread);
509 osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
510 "type from WebSocket client: %d", msg->m_type);
515 char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
517 // clean up our messages
518 for (i = 0; i < num_msgs; i++)
519 osrfMessageFree(msg_list[i]);
524 // All REQUESTs are logged as activity.
525 static void log_request(const char* service, osrfMessage* msg) {
527 const jsonObject* params = msg->_params;
528 growing_buffer* act = buffer_init(128);
529 char* method = msg->method_name;
530 const jsonObject* obj = NULL;
533 int redactParams = 0;
535 buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
537 while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
538 if (!strncmp(method, str, strlen(str))) {
545 OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
548 while ((obj = jsonObjectGetIndex(params, i++))) {
549 char* str = jsonObjectToJSON(obj);
551 OSRF_BUFFER_ADD(act, " ");
553 OSRF_BUFFER_ADD(act, ", ");
554 OSRF_BUFFER_ADD(act, str);
559 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
565 // Relay response messages from OpenSRF to STDIN
566 // Relays all available messages
567 static void read_from_osrf() {
568 transport_message* tmsg = NULL;
570 // Double check the socket connection before continuing.
571 if (!client_connected(osrf_handle) ||
572 !socket_connected(osrf_handle->session->sock_id)) {
573 osrfLogWarning(OSRF_LOG_MARK,
574 "WS: Jabber socket disconnected, exiting");
578 // Once client_recv is called all data waiting on the socket is
579 // read. This means we can't return to the main select() loop after
580 // each message, because any subsequent messages will get stuck in
581 // the opensrf receive queue. Process all available messages.
582 while ( (tmsg = client_recv(osrf_handle, 0)) ) {
583 read_one_osrf_message(tmsg);
588 // Process a single OpenSRF response message and print the reponse
589 // to STDOUT for delivery to the websocket client.
590 static void read_one_osrf_message(transport_message* tmsg) {
591 osrfList *msg_list = NULL;
592 osrfMessage *one_msg = NULL;
595 osrfLogDebug(OSRF_LOG_MARK,
596 "WS received opensrf response for thread=%s", tmsg->thread);
598 // first we need to perform some maintenance
599 msg_list = osrfMessageDeserialize(tmsg->body, NULL);
601 for (i = 0; i < msg_list->size; i++) {
602 one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
604 osrfLogDebug(OSRF_LOG_MARK,
605 "WS returned response of type %d", one_msg->m_type);
607 /* if our client just successfully connected to an opensrf service,
608 cache the sender so that future calls on this thread will use
609 the correct recipient. */
610 if (one_msg && one_msg->m_type == STATUS) {
612 if (one_msg->status_code == OSRF_STATUS_OK) {
614 if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
616 unsigned long ses_size =
617 osrfHashGetCount(stateful_session_cache);
619 if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
621 osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
622 "thread=%s, sender=%s; concurrent=%d",
623 tmsg->thread, tmsg->sender, ses_size);
625 char* sender = strdup(tmsg->sender); // free in *Remove
626 osrfHashSet(stateful_session_cache, sender, tmsg->thread);
630 osrfLogWarning(OSRF_LOG_MARK,
631 "WS max concurrent sessions (%d) reached. "
632 "Current session will not be tracked",
633 MAX_ACTIVE_STATEFUL_SESSIONS
640 // connection timed out; clear the cached recipient
641 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
642 osrfHashRemove(stateful_session_cache, tmsg->thread);
646 if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
647 requests_in_flight--;
654 // osrfMessageDeserialize applies the freeItem handler to the
655 // newly created osrfList. We only need to free the list and
656 // the individual osrfMessage's will be freed along with it
657 osrfListFree(msg_list);
659 // Pack the response into a websocket wrapper message.
660 jsonObject *msg_wrapper = NULL;
661 char *msg_string = NULL;
662 msg_wrapper = jsonNewObject(NULL);
664 jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
665 jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
666 jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
668 if (tmsg->is_error) {
669 // tmsg->sender is the original recipient. they get swapped
671 osrfLogError(OSRF_LOG_MARK,
672 "WS received XMPP error message in response to thread=%s and "
673 "recipient=%s. Likely the recipient is not accessible/available.",
674 tmsg->thread, tmsg->sender);
675 jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
678 msg_string = jsonObjectToJSONRaw(msg_wrapper);
680 // Send the JSON to STDOUT
681 printf("%s\n", msg_string);
684 jsonObjectFree(msg_wrapper);