LP#1703411: Move OpenSRF XMPP attrs to subelement
[opensrf-equinox.git] / src / perl / lib / OpenSRF / Transport / SlimJabber / XMPPReader.pm
index 1e60f70..737cf96 100644 (file)
@@ -5,6 +5,7 @@ use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Time::HiRes qw/time/;
 use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::EX;
 
 # -----------------------------------------------------------
 # Connect, disconnect, and authentication messsage templates
@@ -165,6 +166,11 @@ sub tcp_connected {
 # -----------------------------------------------------------
 sub send {
     my($self, $xml) = @_;
+        
+    local $SIG{'PIPE'} = sub {
+        $logger->error("Disconnected from Jabber server, exiting immediately");
+        exit(99);
+    };
     $self->{socket}->print($xml);
 }
 
@@ -203,18 +209,58 @@ sub wait {
     # build the select readset
     my $infile = '';
     vec($infile, $socket->fileno, 1) = 1;
-    return undef unless select($infile, undef, undef, $timeout);
+
+    my $nfound;
+    if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) {
+        $nfound = select($infile, undef, undef, $timeout);
+    } else {
+        $timeout -= 1.0;
+        for (
+            my $sleep = 1.0;
+            $timeout >= 0.0;
+            do {
+                $sleep = $timeout < 1.0 ? $timeout : 1.0;
+                $timeout -= 1.0;
+            }
+        ) {
+            $nfound = select($infile, undef, undef, $sleep);
+            last if $nfound;
+            if (
+                OpenSRF->OSRF_APACHE_REQUEST_OBJ &&
+                OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted
+            ) {
+                # Should this be more severe? Die or throw error?
+                $logger->warn("Upstream Apache client disconnected, aborting.");
+                last;
+            };
+        }
+    }
+    return undef if !$nfound or $nfound == -1;
 
     # now slurp the data off the socket
     my $buf;
     my $read_size = 1024;
-    while(my $n = sysread($socket, $buf, $read_size)) {
+    my $nonblock = 0;
+    my $nbytes;
+    my $first_read = 1;
+
+    while($nbytes = sysread($socket, $buf, $read_size)) {
         $self->{parser}->parse_more($buf) if $buf;
-        if($n < $read_size or $self->peek_msg) {
-            set_block($socket);
+        if($nbytes < $read_size or $self->peek_msg) {
+            set_block($socket) if $nonblock;
             last;
         }
-        set_nonblock($socket);
+        set_nonblock($socket) unless $nonblock;
+        $nonblock = 1;
+        $first_read = 0;
+    }
+
+    if ($first_read and defined $nbytes and $nbytes == 0) {
+        # if the first read on an active socket is 0 bytes, 
+        # the socket has been disconnected from the remote end. 
+        $self->{stream_state} = DISCONNECTED;
+        $logger->error("Disconnected from Jabber server");
+        throw OpenSRF::EX::Jabber("Disconnected from Jabber server");
     }
 
     return $self->next_msg;
@@ -260,8 +306,13 @@ sub start_element {
 
         my $msg = $self->{message};
         $msg->{to} = $attrs{'to'};
-        $msg->{from} = $attrs{router_from} if $attrs{router_from};
-        $msg->{from} = $attrs{from} unless $msg->{from};
+        $msg->{from} = $attrs{from};
+
+    } elsif($name eq 'opensrf') {
+
+        # These will be authoritative if they exist
+        my $msg = $self->{message};
+        $msg->{from} = $attrs{router_from};
         $msg->{osrf_xid} = $attrs{'osrf_xid'};
         $msg->{type} = $attrs{type};
 
@@ -321,32 +372,22 @@ sub end_element {
     }
 }
 
+
+# read all the data on the jabber socket through the 
+# parser and drop the resulting message
 sub flush_socket {
        my $self = shift;
-       my $socket = $self->socket;
-    return 0 unless $socket and $socket->connected;
-
-    my $flags = fcntl($socket, F_GETFL, 0);
-    fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
+    return 0 unless $self->connected;
 
-    while( my $n = sysread( $socket, my $buf, 8192 ) ) {
-        $logger->debug("flush_socket dropped $n bytes of data");
-        $logger->error("flush_socket dropped data on disconnected socket: $buf")
-            unless($socket->connected);
+    while ($self->wait(0)) {
+        # TODO remove this log line
+        $logger->info("flushing data from socket...");
     }
 
-    fcntl($socket, F_SETFL, $flags);
-    return 0 unless $socket->connected;
-    return 1;
+    return $self->connected;
 }
 
 
 
-
-
 1;
 
-
-
-
-