LP#1824184: Change potentially slow log statements to subroutines
[opensrf-equinox.git] / src / perl / lib / OpenSRF / AppSession.pm
index d450159..1356178 100644 (file)
@@ -10,8 +10,10 @@ use OpenSRF::Utils::Config;
 use OpenSRF::EX;
 use OpenSRF;
 use Exporter;
+use Encode;
 use base qw/Exporter OpenSRF/;
 use Time::HiRes qw( time usleep );
+use POSIX ();
 use warnings;
 use strict;
 
@@ -22,6 +24,35 @@ our %EXPORT_TAGS = ( state => [ qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNEC
 
 my $logger = "OpenSRF::Utils::Logger";
 my $_last_locale = 'en-US';
+our $current_ingress = 'opensrf';
+
+# Get/set the locale used by all new client sessions 
+# for the current process.  This is primarily useful 
+# for clients that wish to make a series of opensrf 
+# calls and don't wish to set the locale for each new 
+# AppSession object.
+#
+# The caller should reset the locale when done using 
+# reset_locale(), as the locale will otherwise persist 
+# for the current process until set/reset again.
+#
+# This is not for SERVER processes, since they 
+# adopt the locale of their respective callers.
+sub default_locale {
+    my ($class, $locale) = @_;
+    $_last_locale = $locale if $locale;
+    return $_last_locale;
+}
+sub reset_locale {
+    my ($class) = @_;
+    return $_last_locale = 'en-US';
+}
+
+sub ingress {
+    my ($class, $ingress) = @_;
+    $current_ingress = $ingress if $ingress;
+    return $current_ingress;
+}
 
 our %_CACHE;
 our @_RESEND_QUEUE;
@@ -115,6 +146,7 @@ sub server_build {
 
        my $self = bless { recv_queue  => [],
                           request_queue  => [],
+               force_recycle => 0,
                           requests  => 0,
                           session_data  => {},
                           callbacks  => {},
@@ -197,6 +229,15 @@ sub stateless {
        return $self->{stateless};
 }
 
+# When true, indicates the server drone should be killed (recycled)
+# after the current session has completed.  This overrides the
+# configured max_request value.
+sub force_recycle {
+    my ($self, $force) = @_;
+    $self->{force_recycle} = $force if defined $force;
+    return $self->{force_recycle};
+}
+
 # When we're a client and we want to connect to a remote service
 sub create {
        my $class = shift;
@@ -382,7 +423,7 @@ sub kill_me {
        }
 
        $self->disconnect;
-       $logger->transport( "AppSession killing self: " . $self->session_id(), DEBUG );
+       $logger->transport(sub{return "AppSession killing self: " . $self->session_id() }, DEBUG );
        delete $_CACHE{$self->session_id};
        delete($$self{$_}) for (keys %$self);
 }
@@ -499,12 +540,14 @@ sub send {
 
         my $locale = $self->session_locale;
                $msg->sender_locale($locale) if ($locale);
+
+               $msg->sender_ingress($current_ingress);
        
                push @doc, $msg;
 
        
-               $logger->debug( "AppSession sending ".$msg->type." to ".$self->remote_id.
-                       " with threadTrace [".$msg->threadTrace."]");
+               $logger->debug(sub{return "AppSession sending ".$msg->type." to ".$self->remote_id.
+                       " with threadTrace [".$msg->threadTrace."]" });
 
        }
        
@@ -700,7 +743,7 @@ sub recv {
                }
        }
 
-       #$logger->debug( ref($self). " recv_queue before wait: " . $self->_print_queue(), INTERNAL );
+       #$logger->debug(sub{return ref($self). " recv_queue before wait: " . $self->_print_queue() }, INTERNAL );
 
        if( exists( $args{timeout} ) ) {
                $args{timeout} = int($args{timeout});
@@ -745,7 +788,7 @@ sub recv {
                last if (scalar(@list) >= $args{count});
        }
 
-       $logger->debug( "Number of matched responses: " . @list, DEBUG );
+       $logger->debug(sub{return "Number of matched responses: " . @list }, DEBUG );
        $self->queue_wait(0); # check for statuses
        
        return $list[0] if (!wantarray);
@@ -766,7 +809,7 @@ sub push_resend {
 
 sub flush_resend {
        my $self = shift;
-       $logger->debug( "Resending..." . @_RESEND_QUEUE, INTERNAL );
+       $logger->debug(sub{return "Resending..." . @_RESEND_QUEUE }, INTERNAL );
        while ( my $req = shift @OpenSRF::AppSession::_RESEND_QUEUE ) {
                $req->resend unless $req->complete;
        }
@@ -827,15 +870,17 @@ sub new {
                        payload                 => $payload,
                        complete                => 0,
                        resp_count              => 0,
-                       max_chunk_count         => 0,
-                       current_chunk_count     => 0,
+                       max_bundle_count        => 0,
+                       current_bundle_count=> 0,
                        max_chunk_size          => 0,
-                       current_chunk_size      => 0,
-                       current_chunk           => [],
+                       max_bundle_size         => 0,
+                       current_bundle_size     => 0,
+                       current_bundle          => [],
                        timeout_reset           => 0,
                        recv_timeout            => 30,
                        remaining_recv_timeout  => 30,
                        recv_queue              => [],
+                       part_recv_buffer=> '',
        };
 
        bless $self => $class;
@@ -845,11 +890,18 @@ sub new {
        return $self;
 }
 
-sub max_chunk_count {
+sub max_bundle_count {
+       my $self = shift;
+       my $value = shift;
+       $self->{max_bundle_count} = $value if (defined($value));
+       return $self->{max_bundle_count};
+}
+
+sub max_bundle_size {
        my $self = shift;
        my $value = shift;
-       $self->{max_chunk_count} = $value if (defined($value));
-       return $self->{max_chunk_count};
+       $self->{max_bundle_size} = $value if (defined($value));
+       return $self->{max_bundle_size};
 }
 
 sub max_chunk_size {
@@ -901,9 +953,17 @@ sub complete {
        } else {
                $self->session->queue_wait(0);
        }
+    $self->completing(0) if ($self->{complete});
        return $self->{complete};
 }
 
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub duration {
        my $self = shift;
        $self->wait_complete;
@@ -938,6 +998,18 @@ sub push_queue {
                $self->complete(1);
                #return; eventually...
        }
+
+       if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+               $self->{part_recv_buffer} .= $resp->content;
+               return 1;
+       } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+               if ($self->{part_recv_buffer}) {
+                       $resp = new OpenSRF::DomainObject::oilsResult;
+                       $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+                       $self->{part_recv_buffer} = '';
+               } 
+       }
+
        push @{ $self->{recv_queue} }, $resp;
 }
 
@@ -956,7 +1028,7 @@ sub payload { return shift()->{payload}; }
 sub resend {
        my $self = shift;
        return unless ($self and $self->session and !$self->complete);
-       OpenSRF::Utils::Logger->debug( "I'm resending the request for threadTrace ". $self->threadTrace, DEBUG);
+       OpenSRF::Utils::Logger->debug(sub{return "I'm resending the request for threadTrace ". $self->threadTrace }, DEBUG);
        return $self->session->send('REQUEST', $self->payload, $self->threadTrace );
 }
 
@@ -981,35 +1053,74 @@ sub respond {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
+    my $type = 'RESULT';
        my $response;
-       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
                $response = $msg;
+        $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+
        } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-               $response->content($msg);
-       }
 
-    if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count
+        if ($self->max_chunk_size > 0) { # we might need to chunk
+            my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+
+            # XML can add a lot of length to a chunk due to escaping, so we
+            # calculate chunk size based on an XML-escaped version of the message.
+            # Example: If escaping doubles the length of the string then $ratio
+            # will be 0.5 and we'll cut the chunk size for this message in half.
+
+            my $raw_length = length(Encode::encode_utf8($str)); # count bytes
+            my $escaped_length = $raw_length;
+            $escaped_length += 11 * (() = ( $str =~ /"/g)); # 7 \s and "
+            $escaped_length += 4 * (() = ( $str =~ /&/g)); # &
+            $escaped_length += 3 * (() = ( $str =~ /[<>]/g)); # &lt; / &gt;
+
+            my $chunk_size = $self->max_chunk_size;
+
+            if ($escaped_length > $self->max_chunk_size) {
+                $chunk_size = POSIX::floor(($raw_length / $escaped_length) * $self->max_chunk_size);
+            }
+
+            if ($raw_length > $chunk_size) { # send partials ("chunking")
+                my $num_bytes = length(Encode::encode_utf8($str));
+                for (my $i = 0; $i < $num_bytes; $i += $chunk_size) {
+                    $response = new OpenSRF::DomainObject::oilsResult::Partial;
+                    $response->content( substr($str, $i, $chunk_size) );
+                    $self->session->send($type, $response, $self->threadTrace);
+                }
+                # This triggers reconstruction on the remote end
+                $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+                return $self->session->send($type, $response, $self->threadTrace);
+            }
+        }
 
-        $self->{current_chunk_size} += OpenSRF::Utils::JSON->perl2JSON($response);
-        push @{$self->{current_chunk}}, $response;  
-        $self->{current_chunk_count}++;
+        # message failed to exceed max chunk size OR chunking disabled
+        $response = new OpenSRF::DomainObject::oilsResult;
+        $response->content($msg);
+    }
 
-        if (
-                ($self->{max_chunk_size}  && $self->{current_chunk_size}  >= $self->{max_chunk_size} ) ||
-                ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count})
+    if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
+
+        $self->{current_bundle_size} += length(
+            Encode::encode_utf8(OpenSRF::Utils::JSON->perl2JSON($response)));
+        push @{$self->{current_bundle}}, $type, $response;  
+        $self->{current_bundle_count}++;
+
+        if ( $self->completing ||
+                ($self->{max_bundle_size}  && $self->{current_bundle_size}  >= $self->{max_bundle_size} ) ||
+                ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
         ) { # send chunk and reset
-            my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace);
-            $self->{current_chunk} = [];
-            $self->{current_chunk_size} = 0;
-            $self->{current_chunk_count} = 0;
+            my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+            $self->{current_bundle} = [];
+            $self->{current_bundle_size} = 0;
+            $self->{current_bundle_count} = 0;
             return $send_res;
         } else { # not at a chunk yet, just queue it up
             return $self->session->app_request( $self->threadTrace );
         }
     }
 
-       $self->session->send('RESULT', $response, $self->threadTrace);
+       $self->session->send($type, $response, $self->threadTrace);
 }
 
 sub respond_complete {
@@ -1017,22 +1128,15 @@ sub respond_complete {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
-       my $response;
-       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
-               $response = $msg;
-       } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-               $response->content($msg);
-       }
-
-    push @{$self->{current_chunk}}, $response;
+    $self->respond($msg) if (defined($msg));
 
-       my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
-               statusCode => STATUS_COMPLETE(),
-               status => 'Request Complete' );
-
-
-       $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace);
+    $self->completing(1);
+    $self->respond(
+        OpenSRF::DomainObject::oilsConnectStatus->new(
+            statusCode => STATUS_COMPLETE(),
+            status => 'Request Complete'
+        )
+    );
        $self->complete(1);
 }
 
@@ -1063,22 +1167,58 @@ sub gather {
 
 
 package OpenSRF::AppSubrequest;
+use base 'OpenSRF::AppRequest';
 
 sub respond {
        my $self = shift;
+       return if $self->complete;
+
        my $resp = shift;
+    return $self->SUPER::respond($resp) if $self->respond_directly;
+
        push @{$$self{resp}}, $resp if (defined $resp);
 }
-sub respond_complete { respond(@_); }
+
+sub respond_complete {
+       my $self = shift;
+    return $self->SUPER::respond_complete(@_) if $self->respond_directly;
+       $self->respond(@_);
+       $self->complete(1);
+}
 
 sub new {
-       my $class = shift;
-       $class = ref($class) || $class;
-       return bless({resp => [], @_}, $class);
+    my $class = shift;
+    $class = ref($class) || $class;
+    my $self = bless({
+        complete        => 0,
+        respond_directly=> 0,  # use the passed session directly (RD mode)
+        resp            => [],
+        threadTrace     => 0,  # needed for respond in RD mode
+        max_chunk_count => 0,  # needed for respond in RD mode
+        max_chunk_size  => 0,  # needed for respond in RD mode
+        max_bundle_size        => 0,
+        current_bundle  => [], # needed for respond_complete in RD mode
+        current_bundle_count=> 0,
+        current_bundle_size    => 0,
+        max_bundle_count       => 0,
+        @_
+    }, $class);
+    if ($self->session) {
+        # steal the thread trace from the parent session for RD mode
+        $self->{threadTrace} = $self->session->session_threadTrace || $self->session->last_threadTrace;
+    }
+    return $self;
 }
 
 sub responses { @{$_[0]->{resp}} }
 
+sub respond_directly {
+       my $x = shift;
+       my $s = shift;
+       $x->{respond_directly} = $s if (defined $s);
+       return $x->session && $x->{respond_directly};
+}
+
 sub session {
        my $x = shift;
        my $s = shift;
@@ -1086,6 +1226,21 @@ sub session {
        return $x->{session};
 }
 
+sub complete {
+       my $x = shift;
+       my $c = shift;
+       $x->{complete} = $c if ($c);
+    $x->completing(0) if ($c);
+       return $x->{complete};
+}
+
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub status {}