LP#1824184: Change potentially slow log statements to subroutines
[opensrf-equinox.git] / src / perl / lib / OpenSRF / Server.pm
index dcf44fe..52c53d2 100644 (file)
@@ -21,6 +21,7 @@ use OpenSRF::Utils::Config;
 use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::Transport::SlimJabber::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
@@ -51,6 +52,7 @@ sub new {
         if $self->{stderr_log_path};
 
     $self->{min_spare_children} ||= 0;
+    $self->{max_backlog_queue} ||= 1000;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
         $self->{max_spare_children} and
@@ -153,13 +155,23 @@ sub run {
     $self->register_routers;
     my $wait_time = 1;
 
+    my @max_children_msg_queue;
+
     # main server loop
     while(1) {
+        my $from_network = 0;
 
         $self->check_status;
         $self->{child_died} = 0;
 
-        my $msg = $self->{osrf_handle}->process($wait_time);
+        my $msg = shift(@max_children_msg_queue);
+
+        # no pending message, so wait for the next one forever
+        $from_network = $wait_time = -1 if (!$msg);
+        $msg ||= $self->{osrf_handle}->process($wait_time);
+
+        !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
+        $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
 
         # we woke up for any reason, reset the wait time to allow
         # for idle maintenance as necessary
@@ -188,11 +200,76 @@ sub run {
                 $logger->warn("server: no children available, waiting... consider increasing " .
                     "max_children for this application higher than $self->{max_children} ".
                     "in the OpenSRF configuration if this message occurs frequently");
-                $self->check_status(1); # block until child is available
 
-                my $child = pop(@{$self->{idle_list}});
-                push(@{$self->{active_list}}, $child);
-                $self->write_child($child, $msg);
+                if ($from_network) {
+                    $chatty and $logger->debug("server: queuing new message");
+                    push @max_children_msg_queue, $msg;
+                } else {
+                    $chatty and $logger->debug("server: re-queuing old message");
+                    unshift @max_children_msg_queue, $msg;
+                }
+
+                $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
+
+                if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+                    # We still have room on the queue. Set the wait time to
+                    # 1s, waiting for a drone to be freed up and reprocess
+                    # this (and any other) queued messages.
+                    $wait_time = 1;
+                    if (!$from_network) {
+                        # if we got here, we had retrieved a message from the queue
+                        # but couldn't process it... but also hadn't fetched any
+                        # additional messages from the network. Doing so now,
+                        # as otherwise only one message will ever get queued
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $chatty and $logger->debug("server: queuing new message after a re-queue");
+                            push @max_children_msg_queue, $msg;
+                        }
+                    }
+                } else {
+
+                    if (!$from_network) {
+                        # The queue is full, and we just requeued a message. We'll
+                        # now see if there is a request available from the network;
+                        # if so, we'll see if a child is available again or else
+                        # drop it
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $self->check_status();
+                            if (@{$self->{idle_list}}) {
+                                # child now available, so we'll go ahead and queue it
+                                $chatty and $logger->debug("server: queuing new message after a re-queue with a full queue");
+                                push @max_children_msg_queue, $msg;
+                            } else {
+                                # ok, need to drop this one
+                                my $resp = OpenSRF::DomainObject::oilsMessage->new();
+                                $resp->type('STATUS');
+                                $resp->payload(
+                                    OpenSRF::DomainObject::oilsMethodException->new(
+                                        status => "Service unavailable: no available children and backlog queue at limit",
+                                        statusCode => STATUS_SERVICEUNAVAILABLE
+                                    )
+                                );
+                                $resp->threadTrace(1);
+
+                                $logger->set_osrf_xid($msg->osrf_xid);
+                                $self->{osrf_handle}->send(
+                                    to => $msg->from,
+                                    osrf_xid => $msg->osrf_xid, # Note that this is ignored, which
+                                                                # is why we called $logger->set_osrf_xid above.
+                                                                # We probably don't want that to be necessary
+                                                                # if osrf_xid is explicitly set here, but that'll
+                                                                # be a FIXME for later
+                                    thread => $msg->thread,
+                                    body => OpenSRF::Utils::JSON->perl2JSON([ $resp ])
+                                );
+                                $logger->warn("Backlog queue full for $self->{service}; forced to drop message " .
+                                              $msg->thread . " from " . $msg->from);
+                            }
+                        }
+                    }
+                }
             }
 
         } else {
@@ -219,7 +296,7 @@ sub run {
 sub perform_idle_maintenance {
     my $self = shift;
 
-    $chatty and $logger->internal(
+    $chatty and $logger->internal(sub{return
         sprintf(
             "server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
             scalar(@{$self->{idle_list}}), 
@@ -227,7 +304,7 @@ sub perform_idle_maintenance {
             $self->{min_spare_children},
             $self->{max_spare_children}
         )
-    );
+    });
 
     # spawn 1 spare child per maintenance loop if necessary
     if( $self->{min_spare_children} and
@@ -303,6 +380,19 @@ sub write_child {
         $self->{sig_pipe} = 0;
         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
 
+        # In rare cases a child can die between creation and first
+        # write, typically a result of a jabber connect error.  Before
+        # sending data to each child, confirm it's still alive.  If it's
+        # not, log the error and drop the message to prevent the parent
+        # process from dying.
+        # When a child dies, all of its attributes are deleted,
+        # so the lack of a pid means the child is dead.
+        if (!$child->{pid}) {
+            $logger->error("server: child is dead in write_child(). ".
+                "unable to send message: $xml");
+            return; # avoid syswrite crash
+        }
+
         # send message to child data pipe
         syswrite($child->{pipe_to_child}, $write_size . $xml);
 
@@ -347,7 +437,7 @@ sub check_status {
 
     return unless @pids;
 
-    $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
+    $chatty and $logger->internal(sub{return "server: ".scalar(@pids)." children reporting for duty: (@pids)" });
 
     my $child;
     my @new_actives;
@@ -363,9 +453,9 @@ sub check_status {
 
     $self->{active_list} = [@new_actives];
 
-    $chatty and $logger->internal(sprintf(
+    $chatty and $logger->internal(sub{return sprintf(
         "server: %d idle and %d active children after status update",
-            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
 
     # some children just went from active to idle. let's see 
     # if any of them need to be killed from a previous sighup.
@@ -419,9 +509,9 @@ sub reap_children {
 
     $self->spawn_children unless $shutdown;
 
-    $chatty and $logger->internal(sprintf(
+    $chatty and $logger->internal(sub{return sprintf(
         "server: %d idle and %d active children after reap_children",
-            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+            scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
 }
 
 # ----------------------------------------------------------------
@@ -465,7 +555,7 @@ sub spawn_child {
             push(@{$self->{idle_list}}, $child);
         }
 
-        $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
+        $chatty and $logger->internal(sub{return "server: server spawned child $child with ".$self->{num_children}." total children" });
 
         return $child;
 
@@ -477,7 +567,7 @@ sub spawn_child {
 
         if($self->{stderr_log}) {
 
-            $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
+            $chatty and $logger->internal(sub{return "server: redirecting STDERR to " . $self->{stderr_log} });
 
             close STDERR;
             unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
@@ -640,10 +730,16 @@ sub run {
             OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
         );
 
-        $self->keepalive_loop($session);
+        my $recycle = $self->keepalive_loop($session);
 
         last if ++$self->{num_requests} == $self->{parent}->{max_requests};
 
+        if ($recycle) {
+            $chatty && $logger->internal(
+                "server: child exiting early on force_recycle");
+            last;
+        }
+
         # Tell the parent process we are available to process requests
         $self->send_status;
 
@@ -759,7 +855,15 @@ sub keepalive_loop {
     }
 
     $chatty and $logger->internal("server: child done with request(s)");
+
+    # Capture the recycle option value before it's clobbered.
+    # The option may be set at any point along the life of the 
+    # session.  Once set, it remains set unless 
+    # $session->force_recycle(0) is explicitly called.
+    my $recycle = $session->force_recycle;
+
     $session->kill_me;
+    return $recycle;
 }
 
 # ----------------------------------------------------------------