use Time::HiRes qw/time/;
use OpenSRF::Transport::SlimJabber::XMPPMessage;
use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::EX;
# -----------------------------------------------------------
# Connect, disconnect, and authentication messsage templates
# -----------------------------------------------------------
sub send {
my($self, $xml) = @_;
+
+ local $SIG{'PIPE'} = sub {
+ $logger->error("Disconnected from Jabber server, exiting immediately");
+ exit(99);
+ };
$self->{socket}->print($xml);
}
# build the select readset
my $infile = '';
vec($infile, $socket->fileno, 1) = 1;
- return undef unless select($infile, undef, undef, $timeout);
+
+ my $nfound;
+ if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) {
+ $nfound = select($infile, undef, undef, $timeout);
+ } else {
+ $timeout -= 1.0;
+ for (
+ my $sleep = 1.0;
+ $timeout >= 0.0;
+ do {
+ $sleep = $timeout < 1.0 ? $timeout : 1.0;
+ $timeout -= 1.0;
+ }
+ ) {
+ $nfound = select($infile, undef, undef, $sleep);
+ last if $nfound;
+ if (
+ OpenSRF->OSRF_APACHE_REQUEST_OBJ &&
+ OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted
+ ) {
+ # Should this be more severe? Die or throw error?
+ $logger->warn("Upstream Apache client disconnected, aborting.");
+ last;
+ };
+ }
+ }
+ return undef if !$nfound or $nfound == -1;
# now slurp the data off the socket
my $buf;
my $read_size = 1024;
- while(my $n = sysread($socket, $buf, $read_size)) {
+ my $nonblock = 0;
+ my $nbytes;
+ my $first_read = 1;
+
+ while($nbytes = sysread($socket, $buf, $read_size)) {
$self->{parser}->parse_more($buf) if $buf;
- if($n < $read_size or $self->peek_msg) {
- set_block($socket);
+ if($nbytes < $read_size or $self->peek_msg) {
+ set_block($socket) if $nonblock;
last;
}
- set_nonblock($socket);
+ set_nonblock($socket) unless $nonblock;
+ $nonblock = 1;
+ $first_read = 0;
+ }
+
+ if ($first_read and defined $nbytes and $nbytes == 0) {
+ # if the first read on an active socket is 0 bytes,
+ # the socket has been disconnected from the remote end.
+ $self->{stream_state} = DISCONNECTED;
+ $logger->error("Disconnected from Jabber server");
+ throw OpenSRF::EX::Jabber("Disconnected from Jabber server");
}
return $self->next_msg;
my $msg = $self->{message};
$msg->{to} = $attrs{'to'};
- $msg->{from} = $attrs{router_from} if $attrs{router_from};
- $msg->{from} = $attrs{from} unless $msg->{from};
+ $msg->{from} = $attrs{from};
+
+ } elsif($name eq 'opensrf') {
+
+ # These will be authoritative if they exist
+ my $msg = $self->{message};
+ $msg->{from} = $attrs{router_from};
$msg->{osrf_xid} = $attrs{'osrf_xid'};
$msg->{type} = $attrs{type};
}
}
+
+# read all the data on the jabber socket through the
+# parser and drop the resulting message
sub flush_socket {
my $self = shift;
- my $socket = $self->socket;
- return 0 unless $socket and $socket->connected;
-
- my $flags = fcntl($socket, F_GETFL, 0);
- fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
+ return 0 unless $self->connected;
- while( my $n = sysread( $socket, my $buf, 8192 ) ) {
- $logger->debug("flush_socket dropped $n bytes of data");
- $logger->error("flush_socket dropped data on disconnected socket: $buf")
- unless($socket->connected);
+ while ($self->wait(0)) {
+ # TODO remove this log line
+ $logger->info("flushing data from socket...");
}
- fcntl($socket, F_SETFL, $flags);
- return 0 unless $socket->connected;
- return 1;
+ return $self->connected;
}
-
-
1;
-
-
-
-