use OpenSRF::Transport::PeerHandle;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Logger qw($logger);
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
use OpenSRF::Transport::SlimJabber::Client;
use Encode;
use POSIX qw/:sys_wait_h :errno_h/;
if $self->{stderr_log_path};
$self->{min_spare_children} ||= 0;
+ $self->{max_backlog_queue} ||= 1000;
$self->{max_spare_children} = $self->{min_spare_children} + 1 if
$self->{max_spare_children} and
$self->register_routers;
my $wait_time = 1;
+ my @max_children_msg_queue;
+
# main server loop
while(1) {
+ my $from_network = 0;
$self->check_status;
$self->{child_died} = 0;
- my $msg = $self->{osrf_handle}->process($wait_time);
+ my $msg = shift(@max_children_msg_queue);
+
+ # no pending message, so wait for the next one forever
+ $from_network = $wait_time = -1 if (!$msg);
+ $msg ||= $self->{osrf_handle}->process($wait_time);
+
+ !$from_network and $chatty and $logger->debug("server: attempting to process previously queued message");
+ $from_network and $chatty and $logger->internal("server: no queued messages, processing due to network or signal");
# we woke up for any reason, reset the wait time to allow
# for idle maintenance as necessary
$logger->warn("server: no children available, waiting... consider increasing " .
"max_children for this application higher than $self->{max_children} ".
"in the OpenSRF configuration if this message occurs frequently");
- $self->check_status(1); # block until child is available
- my $child = pop(@{$self->{idle_list}});
- push(@{$self->{active_list}}, $child);
- $self->write_child($child, $msg);
+ if ($from_network) {
+ $chatty and $logger->debug("server: queuing new message");
+ push @max_children_msg_queue, $msg;
+ } else {
+ $chatty and $logger->debug("server: re-queuing old message");
+ unshift @max_children_msg_queue, $msg;
+ }
+
+ $logger->warn("server: backlog queue size is now ". scalar(@max_children_msg_queue));
+
+ if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+ # We still have room on the queue. Set the wait time to
+ # 1s, waiting for a drone to be freed up and reprocess
+ # this (and any other) queued messages.
+ $wait_time = 1;
+ if (!$from_network) {
+ # if we got here, we had retrieved a message from the queue
+ # but couldn't process it... but also hadn't fetched any
+ # additional messages from the network. Doing so now,
+ # as otherwise only one message will ever get queued
+ $msg = $self->{osrf_handle}->process($wait_time);
+ if ($msg) {
+ $chatty and $logger->debug("server: queuing new message after a re-queue");
+ push @max_children_msg_queue, $msg;
+ }
+ }
+ } else {
+
+ if (!$from_network) {
+ # The queue is full, and we just requeued a message. We'll
+ # now see if there is a request available from the network;
+ # if so, we'll see if a child is available again or else
+ # drop it
+ $msg = $self->{osrf_handle}->process($wait_time);
+ if ($msg) {
+ $self->check_status();
+ if (@{$self->{idle_list}}) {
+ # child now available, so we'll go ahead and queue it
+ $chatty and $logger->debug("server: queuing new message after a re-queue with a full queue");
+ push @max_children_msg_queue, $msg;
+ } else {
+ # ok, need to drop this one
+ my $resp = OpenSRF::DomainObject::oilsMessage->new();
+ $resp->type('STATUS');
+ $resp->payload(
+ OpenSRF::DomainObject::oilsMethodException->new(
+ status => "Service unavailable: no available children and backlog queue at limit",
+ statusCode => STATUS_SERVICEUNAVAILABLE
+ )
+ );
+ $resp->threadTrace(1);
+
+ $logger->set_osrf_xid($msg->osrf_xid);
+ $self->{osrf_handle}->send(
+ to => $msg->from,
+ osrf_xid => $msg->osrf_xid, # Note that this is ignored, which
+ # is why we called $logger->set_osrf_xid above.
+ # We probably don't want that to be necessary
+ # if osrf_xid is explicitly set here, but that'll
+ # be a FIXME for later
+ thread => $msg->thread,
+ body => OpenSRF::Utils::JSON->perl2JSON([ $resp ])
+ );
+ $logger->warn("Backlog queue full for $self->{service}; forced to drop message " .
+ $msg->thread . " from " . $msg->from);
+ }
+ }
+ }
+ }
}
} else {
sub perform_idle_maintenance {
my $self = shift;
- $chatty and $logger->internal(
+ $chatty and $logger->internal(sub{return
sprintf(
"server: %d idle, %d active, %d min_spare, %d max_spare in idle maintenance",
scalar(@{$self->{idle_list}}),
$self->{min_spare_children},
$self->{max_spare_children}
)
- );
+ });
# spawn 1 spare child per maintenance loop if necessary
if( $self->{min_spare_children} and
$self->{sig_pipe} = 0;
local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
+ # In rare cases a child can die between creation and first
+ # write, typically a result of a jabber connect error. Before
+ # sending data to each child, confirm it's still alive. If it's
+ # not, log the error and drop the message to prevent the parent
+ # process from dying.
+ # When a child dies, all of its attributes are deleted,
+ # so the lack of a pid means the child is dead.
+ if (!$child->{pid}) {
+ $logger->error("server: child is dead in write_child(). ".
+ "unable to send message: $xml");
+ return; # avoid syswrite crash
+ }
+
# send message to child data pipe
syswrite($child->{pipe_to_child}, $write_size . $xml);
return unless @pids;
- $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)");
+ $chatty and $logger->internal(sub{return "server: ".scalar(@pids)." children reporting for duty: (@pids)" });
my $child;
my @new_actives;
$self->{active_list} = [@new_actives];
- $chatty and $logger->internal(sprintf(
+ $chatty and $logger->internal(sub{return sprintf(
"server: %d idle and %d active children after status update",
- scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+ scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
# some children just went from active to idle. let's see
# if any of them need to be killed from a previous sighup.
$self->spawn_children unless $shutdown;
- $chatty and $logger->internal(sprintf(
+ $chatty and $logger->internal(sub{return sprintf(
"server: %d idle and %d active children after reap_children",
- scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+ scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})) });
}
# ----------------------------------------------------------------
push(@{$self->{idle_list}}, $child);
}
- $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children");
+ $chatty and $logger->internal(sub{return "server: server spawned child $child with ".$self->{num_children}." total children" });
return $child;
if($self->{stderr_log}) {
- $chatty and $logger->internal("server: redirecting STDERR to " . $self->{stderr_log});
+ $chatty and $logger->internal(sub{return "server: redirecting STDERR to " . $self->{stderr_log} });
close STDERR;
unless( open(STDERR, '>>' . $self->{stderr_log}) ) {
OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
);
- $self->keepalive_loop($session);
+ my $recycle = $self->keepalive_loop($session);
last if ++$self->{num_requests} == $self->{parent}->{max_requests};
+ if ($recycle) {
+ $chatty && $logger->internal(
+ "server: child exiting early on force_recycle");
+ last;
+ }
+
# Tell the parent process we are available to process requests
$self->send_status;
}
$chatty and $logger->internal("server: child done with request(s)");
+
+ # Capture the recycle option value before it's clobbered.
+ # The option may be set at any point along the life of the
+ # session. Once set, it remains set unless
+ # $session->force_recycle(0) is explicitly called.
+ my $recycle = $session->force_recycle;
+
$session->kill_me;
+ return $recycle;
}
# ----------------------------------------------------------------