LP#1803182 Websocketd graceful shutdown support
[opensrf-equinox.git] / src / websocket-stdio / osrf-websocket-stdio.c
1 /* --------------------------------------------------------------------
2  * Copyright (C) 2018 King County Library Service
3  * Bill Erickson <berickxx@gmail.com>
4  *
5  * Code borrows heavily from osrf_websocket_translator.c
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * as published by the Free Software Foundation; either version 2
10  * of the License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16 --------------------------------------------------------------------- */
17
18 /**
19  * OpenSRF Websockets Relay
20  *
21  * Reads Websockets requests on STDIN
22  * Sends replies to requests on STDOUT
23  *
24  * Built to function with websocketd:
25  * https://github.com/joewalnes/websocketd
26  *
27  * Synopsis:
28  *
29  * websocketd --port 7682 --max-forks 250 ./osrf-websocket-stdio /path/to/opensrf_core.xml &
30  *
31  */
32
33 #include <stdio.h>
34 #include <unistd.h>
35 #include <string.h>
36 #include <signal.h>
37 #include <opensrf/utils.h>
38 #include <opensrf/osrf_hash.h>
39 #include <opensrf/transport_client.h>
40 #include <opensrf/osrf_message.h>
41 #include <opensrf/osrf_app_session.h>
42 #include <opensrf/log.h>
43
44 #define MAX_THREAD_SIZE 64
45 #define RECIP_BUF_SIZE 256
46 #define WEBSOCKET_INGRESS "ws-translator-v2"
47
48 // maximun number of active, CONNECTed opensrf sessions allowed. in
49 // practice, this number will be very small, rarely reaching double
50 // digits.  This is just a security back-stop.  A client trying to open
51 // this many connections is almost certainly attempting to DOS the
52 // gateway / server.
53 #define MAX_ACTIVE_STATEFUL_SESSIONS 64
54
55 // Message exceeding this size are discarded.
56 // This value must be greater than RESET_MESSAGE_SIZE (below)
57 // ~10M
58 #define MAX_MESSAGE_SIZE 10485760
59
60 // After processing any message this size or larger, free and
61 // recreate the stdin buffer to release the memory.
62 // ~100k
63 #define RESET_MESSAGE_SIZE 102400
64
65 // After receiving the initial shutdow call, wake the event loop every 
66 // SHUTDOWN_POLL_INTERVAL_SECONDS to see if we can shut down.
67 #define SHUTDOWN_POLL_INTERVAL_SECONDS 1
68
69 // Attempt to gracefully disconnect the client until 
70 // SHUTDOWN_MAX_GRACEFUL_SECONDS has passed without a shutdown
71 // opportunity, at which point force-close the connection.
72 #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
73
74 // Incremented with every REQUEST, decremented with every COMPLETE.
75 static int requests_in_flight = 0;
76
77 // default values, replaced during setup (below) as needed.
78 static char* config_file = "/openils/conf/opensrf_core.xml";
79 static char* config_ctxt = "gateway";
80 static char* osrf_router = NULL;
81 static char* osrf_domain = NULL;
82
83 // Cache of opensrf thread strings and back-end receipients.
84 // Tracking this here means the caller only needs to track the thread.
85 // It also means we don't have to expose internal XMPP IDs
86 static osrfHash* stateful_session_cache = NULL;
87 // Message on STDIN go into our reusable buffer
88 static growing_buffer* stdin_buf = NULL;
89 // OpenSRF XMPP connection handle
90 static transport_client* osrf_handle = NULL;
91 // Reusable string buf for recipient addresses
92 static char recipient_buf[RECIP_BUF_SIZE];
93 // Websocket client IP address (for logging)
94 static char* client_ip = NULL;
95
96 static void rebuild_stdin_buffer();
97 static void child_init(int argc, char* argv[]);
98 static void read_from_stdin();
99 static void relay_stdin_message(const char*);
100 static char* extract_inbound_messages();
101 static void log_request(const char*, osrfMessage*);
102 static void read_from_osrf();
103 static void read_one_osrf_message(transport_message*);
104 static int shut_it_down(int);
105 static void release_hash_string(char*, void*);
106 static int can_shutdown_gracefully();
107
108 // Websocketd closes STDIN on shutdown, followed by SIGTERM.
109 // Signal the back-ends it's time for graceful shutdown by 
110 // sending a SIGUSER1 to the backend processes (or parent 
111 // process group).  Websocket ignores SIGUSR1.
112 static time_t shutdown_requested = 0; 
113 static void sigusr1_handler(int sig) {
114     signal(SIGUSR1, sigusr1_handler);
115     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 -- graceful shutdown");
116     shutdown_requested = time(NULL);
117 }
118
119 int main(int argc, char* argv[]) {
120
121     // Handle shutdown signal -- only needed once.
122     signal(SIGUSR1, sigusr1_handler);
123
124     // Connect to OpenSR -- exits on error
125     child_init(argc, argv);
126
127     // Disable output buffering.
128     setbuf(stdout, NULL);
129     rebuild_stdin_buffer();
130
131     // The main loop waits for data to be available on both STDIN
132     // (websocket client request) and the OpenSRF XMPP socket 
133     // (replies returning to the websocket client).
134     fd_set fds;
135     int stdin_no = fileno(stdin);
136     int osrf_no = osrf_handle->session->sock_id;
137     int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
138     int sel_resp;
139     int shutdown_stat;
140
141     while (1) {
142
143         FD_ZERO(&fds);
144         FD_SET(osrf_no, &fds);
145         FD_SET(stdin_no, &fds);
146
147         if (shutdown_requested) {
148
149             struct timeval tv;
150             tv.tv_usec = 0;
151             tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS;
152     
153             // Wait indefinitely for activity to process
154             sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
155
156         } else {
157
158             // Wait indefinitely for activity to process.
159             // This will be interrupted during a shutdown request signal.
160             sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
161         }
162
163         if (sel_resp < 0) { // error
164
165             if (errno == EINTR) {
166                 // Interrupted by a signal.  Start the loop over.
167                 // Could be a SIGNUSR1 shutdown request.
168                 continue;
169             }
170
171             osrfLogError(OSRF_LOG_MARK,
172                 "WS select() failed with [%s]. Exiting", strerror(errno));
173
174             shut_it_down(1);
175         }
176
177         if (sel_resp > 0) {
178
179             if (FD_ISSET(stdin_no, &fds)) {
180                 read_from_stdin();
181             }
182
183             if (FD_ISSET(osrf_no, &fds)) {
184                 read_from_osrf();
185             }
186         }
187
188         if (shutdown_requested) {
189             shutdown_stat = can_shutdown_gracefully();
190
191             if (shutdown_stat == 0) {
192                 // continue graceful shutdown cycle
193                 continue; 
194             }
195             
196             // graceful shutdown cycle has completed either successfully
197             // or via timeout.
198             return shut_it_down(shutdown_stat > 0 ? 0 : 1);
199         }
200     }
201
202     return shut_it_down(0);
203 }
204
205 // Returns 1 if graceful shutdown is OK.
206 // Returns 0 if graceful shutdown cycle should continue.
207 // Returns -1 if the graceful shutdown cycle timed out.
208 static int can_shutdown_gracefully() {
209
210     time_t cycle_time = time(NULL) - shutdown_requested;
211     if (cycle_time > SHUTDOWN_MAX_GRACEFUL_SECONDS) {
212         osrfLogWarning(OSRF_LOG_MARK, "Timeout during graceful shutdown");
213         return -1;
214     }
215
216     unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
217     if (active_sessions == 0 && requests_in_flight == 0) {
218         osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
219         return 1;
220     }
221
222     osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " 
223         "sessions=%d requests=%d", active_sessions, requests_in_flight);
224
225     return 0;
226 }
227
228 static void rebuild_stdin_buffer() {
229
230     if (stdin_buf != NULL) {
231         buffer_free(stdin_buf);
232     }
233
234     stdin_buf = buffer_init(1024);
235 }
236
237 static int shut_it_down(int stat) {
238     osrfHashFree(stateful_session_cache);
239     buffer_free(stdin_buf);
240     osrf_system_shutdown(); // clean XMPP disconnect
241     exit(stat);
242     return stat;
243 }
244
245
246 // Connect to OpenSRF/XMPP
247 // Apply settings and command line args.
248 static void child_init(int argc, char* argv[]) {
249
250     if (argc > 1) {
251         config_file = argv[1];
252     }
253
254     if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
255         fprintf(stderr, "Cannot boostrap OSRF\n");
256         shut_it_down(1);
257     }
258
259         osrf_handle = osrfSystemGetTransportClient();
260         osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
261
262     osrf_router = osrfConfigGetValue(NULL, "/router_name");
263     osrf_domain = osrfConfigGetValue(NULL, "/domain");
264
265     stateful_session_cache = osrfNewHash();
266     osrfHashSetCallback(stateful_session_cache, release_hash_string);
267
268     client_ip = getenv("REMOTE_ADDR");
269     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
270 }
271
272 // Called by osrfHash when a string is removed.  We strdup each
273 // string before it goes into the hash.
274 static void release_hash_string(char* key, void* str) {
275     if (str == NULL) return;
276     free((char*) str);
277 }
278
279
280 // Relay websocket client messages from STDIN to OpenSRF.  Reads one
281 // message then returns, allowing responses to intermingle with long
282 // series of requests.
283 static void read_from_stdin() {
284     char char_buf[1];
285     char c;
286
287     // Read one char at a time so we can stop at the first newline
288     // and leave any other data on the wire until read_from_stdin()
289     // is called again.
290
291     while (1) {
292         int stat = read(fileno(stdin), char_buf, 1);
293
294         if (stat < 0) {
295
296             if (errno == EAGAIN) {
297                 // read interrupted.  Return to main loop to resume.
298                 // Returning here will leave any in-progress message in
299                 // the stdin_buf.  We return to the main select loop
300                 // to confirm we really have more data to read and to
301                 // perform additional error checking on the stream.
302                 return;
303             }
304
305             // All other errors reading STDIN are considered fatal.
306             osrfLogError(OSRF_LOG_MARK,
307                 "WS STDIN read failed with [%s]. Exiting", strerror(errno));
308             shut_it_down(1);
309             return;
310         }
311
312         if (stat == 0) { // EOF
313             osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
314             shut_it_down(0);
315             return;
316         }
317
318         c = char_buf[0];
319
320         if (c == '\n') { // end of current message
321
322             if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
323                 osrfLogError(OSRF_LOG_MARK,
324                     "WS message exceeded MAX_MESSAGE_SIZE, discarding");
325                 rebuild_stdin_buffer();
326                 return;
327             }
328
329             if (stdin_buf->n_used > 0) {
330                 relay_stdin_message(stdin_buf->buf);
331
332                 if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
333                     // Current message is large.  Rebuild the buffer
334                     // to free the excess memory.
335                     rebuild_stdin_buffer();
336
337                 } else {
338
339                     // Reset the buffer and carry on.
340                     buffer_reset(stdin_buf);
341                 }
342             }
343
344             return;
345
346         } else {
347
348             if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
349                 // Message exceeds max message size.  Continue reading
350                 // and discarding data.  NOTE: don't reset stdin_buf
351                 // here becase we check n_used again once reading is done.
352                 continue;
353             }
354
355             // Add the char to our current message buffer
356             buffer_add_char(stdin_buf, c);
357         }
358     }
359 }
360
361 // Relays a single websocket request to the OpenSRF/XMPP network.
362 static void relay_stdin_message(const char* msg_string) {
363
364     jsonObject *msg_wrapper = NULL; // free me
365     const jsonObject *tmp_obj = NULL;
366     const jsonObject *osrf_msg = NULL;
367     const char *service = NULL;
368     const char *thread = NULL;
369     const char *log_xid = NULL;
370     char *msg_body = NULL;
371     char *recipient = NULL;
372
373     // generate a new log trace for this request. it
374     // may be replaced by a client-provided trace below.
375     osrfLogMkXid();
376
377     osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
378
379     msg_wrapper = jsonParse(msg_string);
380
381     if (msg_wrapper == NULL) {
382         osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
383         return;
384     }
385
386     osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
387
388     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
389         service = jsonObjectGetString(tmp_obj);
390
391     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
392         thread = jsonObjectGetString(tmp_obj);
393
394     if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
395         log_xid = jsonObjectGetString(tmp_obj);
396
397     if (log_xid) {
398
399         // use the caller-provide log trace id
400         if (strlen(log_xid) > MAX_THREAD_SIZE) {
401             osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
402             return;
403         }
404
405         osrfLogForceXid(log_xid);
406     }
407
408     if (thread) {
409
410         if (strlen(thread) > MAX_THREAD_SIZE) {
411             osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
412             return;
413         }
414
415         // since clients can provide their own threads at session start time,
416         // the presence of a thread does not guarantee a cached recipient
417         recipient = (char*) osrfHashGet(stateful_session_cache, thread);
418
419         if (recipient) {
420             osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
421         }
422     }
423
424     if (!recipient) {
425
426         if (service) {
427             int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
428                 "%s@%s/%s", osrf_router, osrf_domain, service);
429             recipient_buf[size] = '\0';
430             recipient = recipient_buf;
431
432         } else {
433             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
434             return;
435         }
436     }
437
438     osrfLogDebug(OSRF_LOG_MARK,
439         "WS relaying message to opensrf thread=%s, recipient=%s",
440             thread, recipient);
441
442     // 'recipient' will be freed in extract_inbound_messages
443     // during a DISCONNECT call.  Retain a local copy.
444     recipient = strdup(recipient);
445
446     msg_body = extract_inbound_messages(service, thread, osrf_msg);
447
448     osrfLogInternal(OSRF_LOG_MARK,
449         "WS relaying inbound message: %s", msg_body);
450
451     transport_message *tmsg = message_init(
452         msg_body, NULL, thread, recipient, NULL);
453
454     free(recipient);
455
456     message_set_osrf_xid(tmsg, osrfLogGetXid());
457
458     if (client_send_message(osrf_handle, tmsg) != 0) {
459         osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
460         shut_it_down(1);
461     }
462
463     osrfLogClearXid();
464     message_free(tmsg);
465     jsonObjectFree(msg_wrapper);
466     free(msg_body);
467 }
468
469 // Turn the OpenSRF message JSON into a set of osrfMessage's for 
470 // analysis, ingress application, and logging.
471 static char* extract_inbound_messages(
472         const char* service, const char* thread, const jsonObject *osrf_msg) {
473
474     int i;
475     int num_msgs = osrf_msg->size;
476     osrfMessage* msg;
477     osrfMessage* msg_list[num_msgs];
478
479     // here we do an extra json round-trip to get the data
480     // in a form osrf_message_deserialize can understand
481     // TODO: consider a version of osrf_message_init which can
482     // accept a jsonObject* instead of a JSON string.
483     char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
484     osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
485     free(osrf_msg_json);
486
487     // should we require the caller to always pass the service?
488     if (service == NULL) service = "";
489
490     for (i = 0; i < num_msgs; i++) {
491         msg = msg_list[i];
492         osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
493
494         switch (msg->m_type) {
495
496             case CONNECT:
497                 break;
498
499             case REQUEST:
500                 log_request(service, msg);
501                 requests_in_flight++;
502                 break;
503
504             case DISCONNECT:
505                 osrfHashRemove(stateful_session_cache, thread);
506                 break;
507
508             default:
509                 osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
510                     "type from WebSocket client: %d", msg->m_type);
511                 break;
512         }
513     }
514
515     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
516
517     // clean up our messages
518     for (i = 0; i < num_msgs; i++)
519         osrfMessageFree(msg_list[i]);
520
521     return finalMsg;
522 }
523
524 // All REQUESTs are logged as activity.
525 static void log_request(const char* service, osrfMessage* msg) {
526
527     const jsonObject* params = msg->_params;
528     growing_buffer* act = buffer_init(128);
529     char* method = msg->method_name;
530     const jsonObject* obj = NULL;
531     int i = 0;
532     const char* str;
533     int redactParams = 0;
534
535     buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
536
537     while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
538         if (!strncmp(method, str, strlen(str))) {
539             redactParams = 1;
540             break;
541         }
542     }
543
544     if (redactParams) {
545         OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
546     } else {
547         i = 0;
548         while ((obj = jsonObjectGetIndex(params, i++))) {
549             char* str = jsonObjectToJSON(obj);
550             if (i == 1)
551                 OSRF_BUFFER_ADD(act, " ");
552             else
553                 OSRF_BUFFER_ADD(act, ", ");
554             OSRF_BUFFER_ADD(act, str);
555             free(str);
556         }
557     }
558
559     osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
560     buffer_free(act);
561 }
562
563
564
565 // Relay response messages from OpenSRF to STDIN
566 // Relays all available messages
567 static void read_from_osrf() {
568     transport_message* tmsg = NULL;
569
570     // Double check the socket connection before continuing.
571     if (!client_connected(osrf_handle) ||
572         !socket_connected(osrf_handle->session->sock_id)) {
573         osrfLogWarning(OSRF_LOG_MARK,
574             "WS: Jabber socket disconnected, exiting");
575         shut_it_down(1);
576     }
577
578     // Once client_recv is called all data waiting on the socket is
579     // read.  This means we can't return to the main select() loop after
580     // each message, because any subsequent messages will get stuck in
581     // the opensrf receive queue. Process all available messages.
582     while ( (tmsg = client_recv(osrf_handle, 0)) ) {
583         read_one_osrf_message(tmsg);
584         message_free(tmsg);
585     }
586 }
587
588 // Process a single OpenSRF response message and print the reponse
589 // to STDOUT for delivery to the websocket client.
590 static void read_one_osrf_message(transport_message* tmsg) {
591     osrfList *msg_list = NULL;
592     osrfMessage *one_msg = NULL;
593     int i;
594
595     osrfLogDebug(OSRF_LOG_MARK,
596         "WS received opensrf response for thread=%s", tmsg->thread);
597
598     // first we need to perform some maintenance
599     msg_list = osrfMessageDeserialize(tmsg->body, NULL);
600
601     for (i = 0; i < msg_list->size; i++) {
602         one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
603
604         osrfLogDebug(OSRF_LOG_MARK,
605             "WS returned response of type %d", one_msg->m_type);
606
607         /*  if our client just successfully connected to an opensrf service,
608             cache the sender so that future calls on this thread will use
609             the correct recipient. */
610         if (one_msg && one_msg->m_type == STATUS) {
611
612             if (one_msg->status_code == OSRF_STATUS_OK) {
613
614                 if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
615
616                     unsigned long ses_size =
617                         osrfHashGetCount(stateful_session_cache);
618
619                     if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
620
621                         osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
622                             "thread=%s, sender=%s; concurrent=%d",
623                             tmsg->thread, tmsg->sender, ses_size);
624
625                         char* sender = strdup(tmsg->sender); // free in *Remove
626                         osrfHashSet(stateful_session_cache, sender, tmsg->thread);
627
628                     } else {
629
630                         osrfLogWarning(OSRF_LOG_MARK,
631                             "WS max concurrent sessions (%d) reached.  "
632                             "Current session will not be tracked",
633                             MAX_ACTIVE_STATEFUL_SESSIONS
634                         );
635                     }
636                 }
637
638             } else {
639
640                 // connection timed out; clear the cached recipient
641                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
642                     osrfHashRemove(stateful_session_cache, tmsg->thread);
643
644                 } else {
645
646                     if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
647                         requests_in_flight--;
648                     }
649                 }
650             }
651         }
652     }
653
654     // osrfMessageDeserialize applies the freeItem handler to the
655     // newly created osrfList.  We only need to free the list and
656     // the individual osrfMessage's will be freed along with it
657     osrfListFree(msg_list);
658
659     // Pack the response into a websocket wrapper message.
660     jsonObject *msg_wrapper = NULL;
661     char *msg_string = NULL;
662     msg_wrapper = jsonNewObject(NULL);
663
664     jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
665     jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
666     jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
667
668     if (tmsg->is_error) {
669         // tmsg->sender is the original recipient. they get swapped
670         // in error replies.
671         osrfLogError(OSRF_LOG_MARK,
672             "WS received XMPP error message in response to thread=%s and "
673             "recipient=%s.  Likely the recipient is not accessible/available.",
674             tmsg->thread, tmsg->sender);
675         jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
676     }
677
678     msg_string = jsonObjectToJSONRaw(msg_wrapper);
679
680     // Send the JSON to STDOUT
681     printf("%s\n", msg_string);
682
683     free(msg_string);
684     jsonObjectFree(msg_wrapper);
685 }
686
687