LP#1824184: Change potentially slow log statements to subroutines
[opensrf-equinox.git] / src / perl / lib / OpenSRF / Server.pm
index 91763d9..52c53d2 100644 (file)
@@ -21,15 +21,18 @@ use OpenSRF::Utils::Config;
 use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::Transport::SlimJabber::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw/usleep/;
 use IO::Select;
 use Socket;
 our $chatty = 1; # disable for production
 
 use constant STATUS_PIPE_DATA_SIZE => 12;
+use constant WRITE_PIPE_DATA_SIZE  => 12;
 
 sub new {
     my($class, $service, %args) = @_;
@@ -41,12 +44,15 @@ sub new {
     $self->{routers}        = []; # list of registered routers
     $self->{active_list}    = []; # list of active children
     $self->{idle_list}      = []; # list of idle children
+    $self->{sighup_pending} = [];
     $self->{pid_map}        = {}; # map of child pid to child for cleaner access
+    $self->{sig_pipe}       = 0;  # true if last syswrite failed
 
     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
         if $self->{stderr_log_path};
 
     $self->{min_spare_children} ||= 0;
+    $self->{max_backlog_queue} ||= 1000;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
         $self->{max_spare_children} and
@@ -61,9 +67,27 @@ sub new {
 sub cleanup {
     my $self = shift;
     my $no_exit = shift;
+    my $graceful = shift;
 
     $logger->info("server: shutting down and cleaning up...");
 
+    # de-register routers
+    $self->unregister_routers;
+
+    if ($graceful) {
+        # graceful shutdown waits for all active 
+        # children to complete their in-process tasks.
+
+        while (@{$self->{active_list}}) {
+            $logger->info("server: graceful shutdown with ".
+                @{$self->{active_list}}." active children...");
+
+            # block until a child is becomes available
+            $self->check_status(1);
+        }
+        $logger->info("server: all clear for graceful shutdown");
+    }
+
     # don't get sidetracked by signals while we're cleaning up.
     # it could result in unexpected behavior with list traversal
     $SIG{CHLD} = 'IGNORE';
@@ -72,9 +96,6 @@ sub cleanup {
     $self->kill_child($_) for
         (@{$self->{idle_list}}, @{$self->{active_list}});
 
-    # de-register routers
-    $self->unregister_routers;
-
     $self->{osrf_handle}->disconnect;
 
     # clean up our dead children
@@ -83,6 +104,34 @@ sub cleanup {
     exit(0) unless $no_exit;
 }
 
+# ----------------------------------------------------------------
+# SIGHUP handler.  Kill all idle children.  Copy list of active
+# children into sighup_pending list for later cleanup.
+# ----------------------------------------------------------------
+sub handle_sighup {
+    my $self = shift;
+    $logger->info("server: caught SIGHUP; reloading children");
+
+    # reload the opensrf config
+    # note: calling ::Config->load() results in ever-growing
+    # package names, which eventually causes an exception
+    OpenSRF::Utils::Config->current->_load(
+        force => 1,
+        config_file => OpenSRF::Utils::Config->current->FILE
+    );
+
+    # force-reload the logger config
+    OpenSRF::Utils::Logger::set_config(1);
+
+    # copy active list into pending list for later cleanup
+    $self->{sighup_pending} = [ @{$self->{active_list}} ];
+
+    # idle_list will be modified as children are reaped.
+    my @idle = @{$self->{idle_list}};
+
+    # idle children are the reaper's plaything
+    $self->kill_child($_) for @idle;
+}
 
 # ----------------------------------------------------------------
 # Waits on the jabber socket for inbound data from the router.
@@ -92,23 +141,37 @@ sub cleanup {
 sub run {
     my $self = shift;
 
-       $logger->set_service($self->{service});
+    $logger->set_service($self->{service});
 
-    $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
+    $SIG{$_} = sub { $self->cleanup; } for (qw/INT QUIT/);
+    $SIG{TERM} = sub { $self->cleanup(0, 1); };
     $SIG{CHLD} = sub { $self->reap_children(); };
+    $SIG{HUP} = sub { $self->handle_sighup(); };
+    $SIG{USR1} = sub { $self->unregister_routers; };
+    $SIG{USR2} = sub { $self->register_routers; };
 
     $self->spawn_children;
     $self->build_osrf_handle;
     $self->register_routers;
     my $wait_time = 1;
 
+    my @max_children_msg_queue;
+
     # main server loop
     while(1) {
+        my $from_network = 0;
 
         $self->check_status;
         $self->{child_died} = 0;
 
-        my $msg = $self->{osrf_handle}->process($wait_time);
+        my $msg = shift(@max_children_msg_queue);
+
+        # no pending message, so wait for the next one forever
+        $from_network = $wait_time = -1 if (!$msg);
+        $msg ||= $self->{osrf_handle}->process($wait_time);
+
+        !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
+        $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
 
         # we woke up for any reason, reset the wait time to allow
         # for idle maintenance as necessary
@@ -116,7 +179,11 @@ sub run {
 
         if($msg) {
 
-            if(my $child = pop(@{$self->{idle_list}})) {
+            if ($msg->type and $msg->type eq 'error') {
+                $logger->info("server: Listener received an XMPP error ".
+                    "message.  Likely a bounced message. sender=".$msg->from);
+
+            } elsif(my $child = pop(@{$self->{idle_list}})) {
 
                 # we have an idle child to handle the request
                 $chatty and $logger->internal("server: passing request to idle child $child");
@@ -133,11 +200,76 @@ sub run {
                 $logger->warn("server: no children available, waiting... consider increasing " .
                     "max_children for this application higher than $self->{max_children} ".
                     "in the OpenSRF configuration if this message occurs frequently");
-                $self->check_status(1); # block until child is available
 
-                my $child = pop(@{$self->{idle_list}});
-                push(@{$self->{active_list}}, $child);
-                $self->write_child($child, $msg);
+                if ($from_network) {
+                    $chatty and $logger->debug("server: queuing new message");
+                    push @max_children_msg_queue, $msg;
+                } else {
+                    $chatty and $logger->debug("server: re-queuing old message");
+                    unshift @max_children_msg_queue, $msg;
+                }
+
+                $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
+
+                if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+                    # We still have room on the queue. Set the wait time to
+                    # 1s, waiting for a drone to be freed up and reprocess
+                    # this (and any other) queued messages.
+                    $wait_time = 1;
+                    if (!$from_network) {
+                        # if we got here, we had retrieved a message from the queue
+                        # but couldn't process it... but also hadn't fetched any
+                        # additional messages from the network. Doing so now,
+                        # as otherwise only one message will ever get queued
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $chatty and $logger->debug("server: queuing new message after a re-queue");
+                            push @max_children_msg_queue, $msg;
+                        }
+                    }
+                } else {
+
+                    if (!$from_network) {
+                        # The queue is full, and we just requeued a message. We'll
+                        # now see if there is a request available from the network;
+                        # if so, we'll see if a child is available again or else
+                        # drop it
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $self->check_status();
+                            if (@{$self->{idle_list}}) {
+                                # child now available, so we'll go ahead and queue it
+                                $chatty and $logger->debug("server: queuing new message after a re-queue with a full queue");
+                                push @max_children_msg_queue, $msg;
+                            } else {
+                                # ok, need to drop this one
+                                my $resp = OpenSRF::DomainObject::oilsMessage->new();
+                                $resp->type('STATUS');
+                                $resp->payload(
+                                    OpenSRF::DomainObject::oilsMethodException->new(
+                                        status => "Service unavailable: no available children and backlog queue at limit",
+                                        statusCode => STATUS_SERVICEUNAVAILABLE
+                                    )
+                                );
+                                $resp->threadTrace(1);
+
+                                $logger->set_osrf_xid($msg->osrf_xid);
+                                $self->{osrf_handle}->send(
+                                    to => $msg->from,
+                                    osrf_xid => $msg->osrf_xid, # Note that this is ignored, which
+                                                                # is why we called $logger->set_osrf_xid above.
+                                                                # We probably don't want that to be necessary
+                                                                # if osrf_xid is explicitly set here, but that'll
+                                                                # be a FIXME for later
+                                    thread => $msg->thread,
+                                    body => OpenSRF::Utils::JSON->perl2JSON([ $resp ])
+                                );
+                                $logger->warn("Backlog queue full for $self->{service}; forced to drop message " .
+                                              $msg->thread . " from " . $msg->from);
+                            }
+                        }
+                    }
+                }
             }
 
         } else {
@@ -164,7 +296,7 @@ sub run {
 sub perform_idle_maintenance {
     my $self = shift;
 
-    $chatty and $logger->internal(
+    $chatty and $logger->internal(sub{return
         sprintf(
             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
             scalar(@{$self->{idle_list}}), 
@@ -172,7 +304,7 @@ sub perform_idle_maintenance {
             $self->{min_spare_children},
             $self->{max_spare_children}
         )
-    );
+    });
 
     # spawn 1 spare child per maintenance loop if necessary
     if( $self->{min_spare_children} and
@@ -236,8 +368,40 @@ sub build_osrf_handle {
 # ----------------------------------------------------------------
 sub write_child {
     my($self, $child, $msg) = @_;
-    my $xml = decode_utf8($msg->to_xml);
-    syswrite($child->{pipe_to_child}, encode_utf8($xml));
+    my $xml = encode_utf8(decode_utf8($msg->to_xml));
+
+    # tell the child how much data to expect, minus the header
+    my $write_size;
+    {use bytes; $write_size = length($xml)}
+    $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
+
+    for (0..2) {
+
+        $self->{sig_pipe} = 0;
+        local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
+
+        # In rare cases a child can die between creation and first
+        # write, typically a result of a jabber connect error.  Before
+        # sending data to each child, confirm it's still alive.  If it's
+        # not, log the error and drop the message to prevent the parent
+        # process from dying.
+        # When a child dies, all of its attributes are deleted,
+        # so the lack of a pid means the child is dead.
+        if (!$child->{pid}) {
+            $logger->error("server: child is dead in write_child(). ".
+                "unable to send message: $xml");
+            return; # avoid syswrite crash
+        }
+
+        # send message to child data pipe
+        syswrite($child->{pipe_to_child}, $write_size . $xml);
+
+        last unless $self->{sig_pipe};
+        $logger->error("server: got SIGPIPE writing to $child, retrying...");
+        usleep(50000); # 50 msec
+    }
+
+    $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
 }
 
 # ----------------------------------------------------------------
@@ -249,9 +413,6 @@ sub check_status {
 
     return unless @{$self->{active_list}};
 
-    my $read_set = IO::Select->new;
-    $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
-
     my @pids;
 
     while (1) {
@@ -259,6 +420,10 @@ sub check_status {
         # if can_read or sysread is interrupted while bloking, go back and 
         # wait again until we have at least 1 free child
 
+        # refresh the read_set handles in case we lost a child in the previous iteration
+        my $read_set = IO::Select->new;
+        $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
+
         if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
             my $pid = '';
             for my $pipe (@handles) {
@@ -272,7 +437,7 @@ sub check_status {
 
     return unless @pids;
 
-    $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
+    $chatty and $logger->internal(sub{return "server: ".scalar(@pids)." children reporting for duty: (@pids)" });
 
     my $child;
     my @new_actives;
@@ -288,9 +453,30 @@ sub check_status {
 
     $self->{active_list} = [@new_actives];
 
-    $chatty and $logger->internal(sprintf(
+    $chatty and $logger->internal(sub{return sprintf(
         "server: %d idle and %d active children after status update",
-            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
+
+    # some children just went from active to idle. let's see 
+    # if any of them need to be killed from a previous sighup.
+
+    for my $child (@{$self->{sighup_pending}}) {
+        if (grep {$_ == $child->{pid}} @pids) {
+
+            $chatty and $logger->internal(
+                "server: killing previously-active ".
+                "child after receiving SIGHUP: $child");
+
+            # remove the pending child
+            $self->{sighup_pending} = [
+                grep {$_->{pid} != $child->{pid}} 
+                    @{$self->{sighup_pending}}
+            ];
+
+            # kill the pending child
+            $self->kill_child($child)
+        }
+    }
 }
 
 # ----------------------------------------------------------------
@@ -323,10 +509,9 @@ sub reap_children {
 
     $self->spawn_children unless $shutdown;
 
-    $chatty and $logger->internal(sprintf(
+    $chatty and $logger->internal(sub{return sprintf(
         "server: %d idle and %d active children after reap_children",
-            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
-
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
 }
 
 # ----------------------------------------------------------------
@@ -370,17 +555,19 @@ sub spawn_child {
             push(@{$self->{idle_list}}, $child);
         }
 
-        $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
+        $chatty and $logger->internal(sub{return "server: server spawned child $child with ".$self->{num_children}." total children" });
 
         return $child;
 
     } else { # child process
 
-        $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
+        # recover default handling for any signal whose handler 
+        # may have been adopted from the parent process.
+        $SIG{$_} = 'DEFAULT' for qw/TERM INT QUIT HUP CHLD USR1 USR2/;
 
         if($self->{stderr_log}) {
 
-            $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
+            $chatty and $logger->internal(sub{return "server: redirecting STDERR to " . $self->{stderr_log} });
 
             close STDERR;
             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
@@ -452,7 +639,7 @@ sub unregister_routers {
     my $self = shift;
     return unless $self->{osrf_handle}->tcp_connected;
 
-       for my $router (@{$self->{routers}}) {
+    for my $router (@{$self->{routers}}) {
         $logger->info("server: disconnecting from router $router");
         $self->{osrf_handle}->send(
             to => $router,
@@ -474,7 +661,7 @@ use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Time::HiRes qw(time);
+use Time::HiRes qw(time usleep);
 use POSIX qw/:sys_wait_h :errno_h/;
 
 use overload '""' => sub { return '[' . shift()->{pid} . ']'; };
@@ -485,6 +672,7 @@ sub new {
     $self->{pid} = 0; # my process ID
     $self->{parent} = $parent; # Controller parent process
     $self->{num_requests} = 0; # total serviced requests
+    $self->{sig_pipe} = 0;  # true if last syswrite failed
     return $self;
 }
 
@@ -509,8 +697,8 @@ sub init {
     my $service = $self->{parent}->{service};
     $0 = "OpenSRF Drone [$service]";
     OpenSRF::Transport::PeerHandle->construct($service);
-       OpenSRF::Application->application_implementation->child_init
-               if (OpenSRF::Application->application_implementation->can('child_init'));
+    OpenSRF::Application->application_implementation->child_init
+        if (OpenSRF::Application->application_implementation->can('child_init'));
 }
 
 # ----------------------------------------------------------------
@@ -542,10 +730,16 @@ sub run {
             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
         );
 
-        $self->keepalive_loop($session);
+        my $recycle = $self->keepalive_loop($session);
 
         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
 
+        if ($recycle) {
+            $chatty && $logger->internal(
+                "server: child exiting early on force_recycle");
+            last;
+        }
+
         # Tell the parent process we are available to process requests
         $self->send_status;
 
@@ -555,8 +749,8 @@ sub run {
 
     $chatty and $logger->internal("server: child process shutting down after reaching max_requests");
 
-       OpenSRF::Application->application_implementation->child_exit
-               if (OpenSRF::Application->application_implementation->can('child_exit'));
+    OpenSRF::Application->application_implementation->child_exit
+        if (OpenSRF::Application->application_implementation->can('child_exit'));
 }
 
 # ----------------------------------------------------------------
@@ -565,32 +759,71 @@ sub run {
 sub wait_for_request {
     my $self = shift;
 
-    my $data = '';
-    my $read_size = 1024;
-    my $nonblock = 0;
+    my $data = ''; # final request data
+    my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
+    my $read_pipe = $self->{pipe_to_parent};
+    my $bytes_needed; # size of the data we are about to receive
+    my $bytes_recvd; # number of bytes read so far
+    my $first_read = 1; # true for first loop iteration
+    my $read_error;
 
-    while(1) {
-        # Start out blocking, when data is available, read it all
+    while (1) {
 
-        my $buf = '';
-        my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+        # wait for some data to start arriving
+        my $read_set = IO::Select->new;
+        $read_set->add($read_pipe);
+    
+        while (1) {
+            # if can_read is interrupted while blocking, 
+            # go back and wait again until it succeeds.
+            last if $read_set->can_read;
+        }
 
-        unless(defined $n) {
-            $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
-            last;
+        # parent started writing, let's start reading
+        $self->set_nonblock($read_pipe);
+
+        while (1) {
+            # read all of the available data
+
+            my $buf = '';
+            my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
+
+            unless(defined $nbytes) {
+                if ($! != EAGAIN) {
+                    $logger->error("server: error reading data from parent: $!.  ".
+                        "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
+                    $read_error = 1;
+                }
+                last;
+            }
+
+            last if $nbytes <= 0; # no more data available for reading
+
+            $bytes_recvd += $nbytes;
+            $data .= $buf;
         }
 
-        last if $n <= 0; # no data left to read
+        $self->set_block($self->{pipe_to_parent});
+        return undef if $read_error;
 
-        $data .= $buf;
+        # extract the data size and remove the header from the final data
+        if ($first_read) {
+            my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
+            $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
+            $data = substr($data, $wps_size);
+            $first_read = 0;
+        }
 
-        last if $n < $read_size; # done reading all data
 
-        $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock;
-        $nonblock = 1;
+        if ($bytes_recvd == $bytes_needed) {
+            # we've read all the data. Nothing left to do
+            last;
+        }
+
+        $logger->info("server: child process read all available pipe data.  ".
+            "waiting for more data from parent.  bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
     }
 
-    $self->set_block($self->{pipe_to_parent}) if $nonblock;
     return $data;
 }
 
@@ -622,7 +855,15 @@ sub keepalive_loop {
     }
 
     $chatty and $logger->internal("server: child done with request(s)");
+
+    # Capture the recycle option value before it's clobbered.
+    # The option may be set at any point along the life of the 
+    # session.  Once set, it remains set unless 
+    # $session->force_recycle(0) is explicitly called.
+    my $recycle = $session->force_recycle;
+
     $session->kill_me;
+    return $recycle;
 }
 
 # ----------------------------------------------------------------
@@ -630,10 +871,23 @@ sub keepalive_loop {
 # ----------------------------------------------------------------
 sub send_status {
     my $self = shift;
-    syswrite(
-        $self->{pipe_to_parent},
-        sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
-    );
+
+    for (0..2) {
+
+        $self->{sig_pipe} = 0;
+        local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
+
+        syswrite(
+            $self->{pipe_to_parent},
+            sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid})
+        );
+
+        last unless $self->{sig_pipe};
+        $logger->error("server: $self got SIGPIPE writing status to parent, retrying...");
+        usleep(50000); # 50 msec
+    }
+
+    $logger->error("server: $self unable to send status to parent") if $self->{sig_pipe};
 }