$self->{routers} = []; # list of registered routers
$self->{active_list} = []; # list of active children
$self->{idle_list} = []; # list of idle children
+ $self->{sighup_pending} = [];
$self->{pid_map} = {}; # map of child pid to child for cleaner access
$self->{sig_pipe} = 0; # true if last syswrite failed
if $self->{stderr_log_path};
$self->{min_spare_children} ||= 0;
+ $self->{max_backlog_queue} ||= 1000;
$self->{max_spare_children} = $self->{min_spare_children} + 1 if
$self->{max_spare_children} and
sub cleanup {
my $self = shift;
my $no_exit = shift;
+ my $graceful = shift;
$logger->info("server: shutting down and cleaning up...");
+ # de-register routers
+ $self->unregister_routers;
+
+ if ($graceful) {
+ # graceful shutdown waits for all active
+ # children to complete their in-process tasks.
+
+ while (@{$self->{active_list}}) {
+ $logger->info("server: graceful shutdown with ".
+ @{$self->{active_list}}." active children...");
+
+ # block until a child is becomes available
+ $self->check_status(1);
+ }
+ $logger->info("server: all clear for graceful shutdown");
+ }
+
# don't get sidetracked by signals while we're cleaning up.
# it could result in unexpected behavior with list traversal
$SIG{CHLD} = 'IGNORE';
$self->kill_child($_) for
(@{$self->{idle_list}}, @{$self->{active_list}});
- # de-register routers
- $self->unregister_routers;
-
$self->{osrf_handle}->disconnect;
# clean up our dead children
}
# ----------------------------------------------------------------
+# SIGHUP handler. Kill all idle children. Copy list of active
+# children into sighup_pending list for later cleanup.
+# ----------------------------------------------------------------
+sub handle_sighup {
+ my $self = shift;
+ $logger->info("server: caught SIGHUP; reloading children");
+
+ # reload the opensrf config
+ # note: calling ::Config->load() results in ever-growing
+ # package names, which eventually causes an exception
+ OpenSRF::Utils::Config->current->_load(
+ force => 1,
+ config_file => OpenSRF::Utils::Config->current->FILE
+ );
+
+ # force-reload the logger config
+ OpenSRF::Utils::Logger::set_config(1);
+
+ # copy active list into pending list for later cleanup
+ $self->{sighup_pending} = [ @{$self->{active_list}} ];
+
+ # idle_list will be modified as children are reaped.
+ my @idle = @{$self->{idle_list}};
+
+ # idle children are the reaper's plaything
+ $self->kill_child($_) for @idle;
+}
+
+# ----------------------------------------------------------------
# Waits on the jabber socket for inbound data from the router.
# Each new message is passed off to a child process for handling.
# At regular intervals, wake up for min/max spare child maintenance
sub run {
my $self = shift;
- $logger->set_service($self->{service});
+ $logger->set_service($self->{service});
- $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/);
+ $SIG{$_} = sub { $self->cleanup; } for (qw/INT QUIT/);
+ $SIG{TERM} = sub { $self->cleanup(0, 1); };
$SIG{CHLD} = sub { $self->reap_children(); };
+ $SIG{HUP} = sub { $self->handle_sighup(); };
+ $SIG{USR1} = sub { $self->unregister_routers; };
+ $SIG{USR2} = sub { $self->register_routers; };
$self->spawn_children;
$self->build_osrf_handle;
$self->register_routers;
my $wait_time = 1;
+ my @max_children_msg_queue;
+
# main server loop
while(1) {
+ my $from_network = 0;
$self->check_status;
$self->{child_died} = 0;
- my $msg = $self->{osrf_handle}->process($wait_time);
+ my $msg = shift(@max_children_msg_queue);
+
+ # no pending message, so wait for the next one forever
+ $from_network = $wait_time = -1 if (!$msg);
+ $msg ||= $self->{osrf_handle}->process($wait_time);
# we woke up for any reason, reset the wait time to allow
# for idle maintenance as necessary
if($msg) {
- if(my $child = pop(@{$self->{idle_list}})) {
+ if ($msg->type and $msg->type eq 'error') {
+ $logger->info("server: Listener received an XMPP error ".
+ "message. Likely a bounced message. sender=".$msg->from);
+
+ } elsif(my $child = pop(@{$self->{idle_list}})) {
# we have an idle child to handle the request
$chatty and $logger->internal("server: passing request to idle child $child");
$logger->warn("server: no children available, waiting... consider increasing " .
"max_children for this application higher than $self->{max_children} ".
"in the OpenSRF configuration if this message occurs frequently");
- $self->check_status(1); # block until child is available
- my $child = pop(@{$self->{idle_list}});
- push(@{$self->{active_list}}, $child);
- $self->write_child($child, $msg);
+ if ($from_network) {
+ push @max_children_msg_queue, $msg;
+ } else {
+ unshift @max_children_msg_queue, $msg;
+ }
+
+ if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+ # We still have room on the queue. Set the wait time to
+ # 1s, waiting for a drone to be freed up and reprocess
+ # this (and any other) queued messages.
+ $wait_time = 1;
+ if (!$from_network) {
+ # if we got here, we had retrieved a message from the queue
+ # but couldn't process it... but also hadn't fetched any
+ # additional messages from the network. Doing so now,
+ # as otherwise only one message will ever get queued
+ $msg = $self->{osrf_handle}->process($wait_time);
+ if ($msg) {
+ $chatty and $logger->debug("server: queuing new message after a re-queue");
+ push @max_children_msg_queue, $msg;
+ }
+ }
+ } else {
+ # We'll just have to wait
+ $self->check_status(1); # block until child is available
+ }
}
} else {
$self->{sig_pipe} = 0;
local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
+ # In rare cases a child can die between creation and first
+ # write, typically a result of a jabber connect error. Before
+ # sending data to each child, confirm it's still alive. If it's
+ # not, log the error and drop the message to prevent the parent
+ # process from dying.
+ # When a child dies, all of its attributes are deleted,
+ # so the lack of a pid means the child is dead.
+ if (!$child->{pid}) {
+ $logger->error("server: child is dead in write_child(). ".
+ "unable to send message: $xml");
+ return; # avoid syswrite crash
+ }
+
# send message to child data pipe
syswrite($child->{pipe_to_child}, $write_size . $xml);
return unless @{$self->{active_list}};
- my $read_set = IO::Select->new;
- $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
-
my @pids;
while (1) {
# if can_read or sysread is interrupted while bloking, go back and
# wait again until we have at least 1 free child
+ # refresh the read_set handles in case we lost a child in the previous iteration
+ my $read_set = IO::Select->new;
+ $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}};
+
if(my @handles = $read_set->can_read(($block) ? undef : 0)) {
my $pid = '';
for my $pipe (@handles) {
$chatty and $logger->internal(sprintf(
"server: %d idle and %d active children after status update",
scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
+
+ # some children just went from active to idle. let's see
+ # if any of them need to be killed from a previous sighup.
+
+ for my $child (@{$self->{sighup_pending}}) {
+ if (grep {$_ == $child->{pid}} @pids) {
+
+ $chatty and $logger->internal(
+ "server: killing previously-active ".
+ "child after receiving SIGHUP: $child");
+
+ # remove the pending child
+ $self->{sighup_pending} = [
+ grep {$_->{pid} != $child->{pid}}
+ @{$self->{sighup_pending}}
+ ];
+
+ # kill the pending child
+ $self->kill_child($child)
+ }
+ }
}
# ----------------------------------------------------------------
$chatty and $logger->internal(sprintf(
"server: %d idle and %d active children after reap_children",
scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}})));
-
}
# ----------------------------------------------------------------
} else { # child process
- $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/);
+ # recover default handling for any signal whose handler
+ # may have been adopted from the parent process.
+ $SIG{$_} = 'DEFAULT' for qw/TERM INT QUIT HUP CHLD USR1 USR2/;
if($self->{stderr_log}) {
my $self = shift;
return unless $self->{osrf_handle}->tcp_connected;
- for my $router (@{$self->{routers}}) {
+ for my $router (@{$self->{routers}}) {
$logger->info("server: disconnecting from router $router");
$self->{osrf_handle}->send(
to => $router,
my $service = $self->{parent}->{service};
$0 = "OpenSRF Drone [$service]";
OpenSRF::Transport::PeerHandle->construct($service);
- OpenSRF::Application->application_implementation->child_init
- if (OpenSRF::Application->application_implementation->can('child_init'));
+ OpenSRF::Application->application_implementation->child_init
+ if (OpenSRF::Application->application_implementation->can('child_init'));
}
# ----------------------------------------------------------------
OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
);
- $self->keepalive_loop($session);
+ my $recycle = $self->keepalive_loop($session);
last if ++$self->{num_requests} == $self->{parent}->{max_requests};
+ if ($recycle) {
+ $chatty && $logger->internal(
+ "server: child exiting early on force_recycle");
+ last;
+ }
+
# Tell the parent process we are available to process requests
$self->send_status;
$chatty and $logger->internal("server: child process shutting down after reaching max_requests");
- OpenSRF::Application->application_implementation->child_exit
- if (OpenSRF::Application->application_implementation->can('child_exit'));
+ OpenSRF::Application->application_implementation->child_exit
+ if (OpenSRF::Application->application_implementation->can('child_exit'));
}
# ----------------------------------------------------------------
sub wait_for_request {
my $self = shift;
- my $data = '';
- my $buf_size = 4096;
- my $nonblock = 0;
+ my $data = ''; # final request data
+ my $buf_size = 4096; # default linux pipe_buf (atomic window, not total size)
my $read_pipe = $self->{pipe_to_parent};
- my $data_size;
- my $total_read;
- my $first_read = 1;
+ my $bytes_needed; # size of the data we are about to receive
+ my $bytes_recvd; # number of bytes read so far
+ my $first_read = 1; # true for first loop iteration
+ my $read_error;
while (1) {
$self->set_nonblock($read_pipe);
while (1) {
- # pull as much data from the pipe as possible
+ # read all of the available data
my $buf = '';
- my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size);
-
- unless(defined $bytes_read) {
- $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+ my $nbytes = sysread($self->{pipe_to_parent}, $buf, $buf_size);
+
+ unless(defined $nbytes) {
+ if ($! != EAGAIN) {
+ $logger->error("server: error reading data from parent: $!. ".
+ "bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd; data=$data");
+ $read_error = 1;
+ }
last;
}
- last if $bytes_read <= 0; # no more data available for reading
+ last if $nbytes <= 0; # no more data available for reading
- $total_read += $bytes_read;
+ $bytes_recvd += $nbytes;
$data .= $buf;
}
- # we've read all the data currently available on the pipe.
- # let's see if we're done.
+ $self->set_block($self->{pipe_to_parent});
+ return undef if $read_error;
+ # extract the data size and remove the header from the final data
if ($first_read) {
- # extract the data size and remove the size header
my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
- $data_size = int(substr($data, 0, $wps_size)) + $wps_size;
+ $bytes_needed = int(substr($data, 0, $wps_size)) + $wps_size;
$data = substr($data, $wps_size);
$first_read = 0;
}
- $self->set_block($self->{pipe_to_parent});
- if ($total_read == $data_size) {
+ if ($bytes_recvd == $bytes_needed) {
# we've read all the data. Nothing left to do
last;
}
+
+ $logger->info("server: child process read all available pipe data. ".
+ "waiting for more data from parent. bytes_needed=$bytes_needed; bytes_recvd=$bytes_recvd");
}
return $data;
}
$chatty and $logger->internal("server: child done with request(s)");
+
+ # Capture the recycle option value before it's clobbered.
+ # The option may be set at any point along the life of the
+ # session. Once set, it remains set unless
+ # $session->force_recycle(0) is explicitly called.
+ my $recycle = $session->force_recycle;
+
$session->kill_me;
+ return $recycle;
}
# ----------------------------------------------------------------