use OpenSRF::EX;
use OpenSRF;
use Exporter;
+use Encode;
use base qw/Exporter OpenSRF/;
use Time::HiRes qw( time usleep );
+use POSIX ();
use warnings;
use strict;
my $logger = "OpenSRF::Utils::Logger";
my $_last_locale = 'en-US';
+our $current_ingress = 'opensrf';
+
+# Get/set the locale used by all new client sessions
+# for the current process. This is primarily useful
+# for clients that wish to make a series of opensrf
+# calls and don't wish to set the locale for each new
+# AppSession object.
+#
+# The caller should reset the locale when done using
+# reset_locale(), as the locale will otherwise persist
+# for the current process until set/reset again.
+#
+# This is not for SERVER processes, since they
+# adopt the locale of their respective callers.
+sub default_locale {
+ my ($class, $locale) = @_;
+ $_last_locale = $locale if $locale;
+ return $_last_locale;
+}
+sub reset_locale {
+ my ($class) = @_;
+ return $_last_locale = 'en-US';
+}
+
+sub ingress {
+ my ($class, $ingress) = @_;
+ $current_ingress = $ingress if $ingress;
+ return $current_ingress;
+}
our %_CACHE;
our @_RESEND_QUEUE;
my $self = bless { recv_queue => [],
request_queue => [],
+ force_recycle => 0,
requests => 0,
session_data => {},
callbacks => {},
return $self->{stateless};
}
+# When true, indicates the server drone should be killed (recycled)
+# after the current session has completed. This overrides the
+# configured max_request value.
+sub force_recycle {
+ my ($self, $force) = @_;
+ $self->{force_recycle} = $force if defined $force;
+ return $self->{force_recycle};
+}
+
# When we're a client and we want to connect to a remote service
sub create {
my $class = shift;
}
$self->disconnect;
- $logger->transport( "AppSession killing self: " . $self->session_id(), DEBUG );
+ $logger->transport(sub{return "AppSession killing self: " . $self->session_id() }, DEBUG );
delete $_CACHE{$self->session_id};
delete($$self{$_}) for (keys %$self);
}
my $locale = $self->session_locale;
$msg->sender_locale($locale) if ($locale);
+
+ $msg->sender_ingress($current_ingress);
push @doc, $msg;
- $logger->info( "AppSession sending ".$msg->type." to ".$self->remote_id.
- " with threadTrace [".$msg->threadTrace."]");
+ $logger->debug(sub{return "AppSession sending ".$msg->type." to ".$self->remote_id.
+ " with threadTrace [".$msg->threadTrace."]" });
}
}
}
- #$logger->debug( ref($self). " recv_queue before wait: " . $self->_print_queue(), INTERNAL );
+ #$logger->debug(sub{return ref($self). " recv_queue before wait: " . $self->_print_queue() }, INTERNAL );
if( exists( $args{timeout} ) ) {
$args{timeout} = int($args{timeout});
$avail = @{ $self->{recv_queue} };
}
+ $self->timed_out(1) if ( $self->{remaining_recv_timeout} <= 0 );
my @list;
while ( my $msg = shift @{ $self->{recv_queue} } ) {
last if (scalar(@list) >= $args{count});
}
- $logger->debug( "Number of matched responses: " . @list, DEBUG );
+ $logger->debug(sub{return "Number of matched responses: " . @list }, DEBUG );
$self->queue_wait(0); # check for statuses
return $list[0] if (!wantarray);
return @list;
}
+sub timed_out {
+ my $self = shift;
+ my $out = shift;
+ $self->{timed_out} = $out if (defined $out);
+ return $self->{timed_out};
+}
+
sub push_resend {
my $self = shift;
push @OpenSRF::AppSession::_RESEND_QUEUE, @_;
sub flush_resend {
my $self = shift;
- $logger->debug( "Resending..." . @_RESEND_QUEUE, INTERNAL );
+ $logger->debug(sub{return "Resending..." . @_RESEND_QUEUE }, INTERNAL );
while ( my $req = shift @OpenSRF::AppSession::_RESEND_QUEUE ) {
$req->resend unless $req->complete;
}
threadTrace => $threadTrace,
payload => $payload,
complete => 0,
+ resp_count => 0,
+ max_bundle_count => 0,
+ current_bundle_count=> 0,
+ max_chunk_size => 0,
+ max_bundle_size => 0,
+ current_bundle_size => 0,
+ current_bundle => [],
timeout_reset => 0,
recv_timeout => 30,
remaining_recv_timeout => 30,
recv_queue => [],
+ part_recv_buffer=> '',
};
bless $self => $class;
return $self;
}
+sub max_bundle_count {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_bundle_count} = $value if (defined($value));
+ return $self->{max_bundle_count};
+}
+
+sub max_bundle_size {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_bundle_size} = $value if (defined($value));
+ return $self->{max_bundle_size};
+}
+
+sub max_chunk_size {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_chunk_size} = $value if (defined($value));
+ return $self->{max_chunk_size};
+}
+
sub recv_timeout {
my $self = shift;
my $timeout = shift;
} else {
$self->session->queue_wait(0);
}
+ $self->completing(0) if ($self->{complete});
return $self->{complete};
}
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub duration {
my $self = shift;
$self->wait_complete;
$self->complete(1);
#return; eventually...
}
+
+ if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+ $self->{part_recv_buffer} .= $resp->content;
+ return 1;
+ } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+ if ($self->{part_recv_buffer}) {
+ $resp = new OpenSRF::DomainObject::oilsResult;
+ $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+ $self->{part_recv_buffer} = '';
+ }
+ }
+
push @{ $self->{recv_queue} }, $resp;
}
sub resend {
my $self = shift;
return unless ($self and $self->session and !$self->complete);
- OpenSRF::Utils::Logger->debug( "I'm resending the request for threadTrace ". $self->threadTrace, DEBUG);
+ OpenSRF::Utils::Logger->debug(sub{return "I'm resending the request for threadTrace ". $self->threadTrace }, DEBUG);
return $self->session->send('REQUEST', $self->payload, $self->threadTrace );
}
$self->session->send( 'STATUS',$msg, $self->threadTrace );
}
-sub stream_push {
- my $self = shift;
- my $msg = shift;
- $self->respond( $msg );
-}
+# TODO stream_push only works when server sessions can accept RESULT
+# messages, which is no longer supported. Create a new OpenSRF message
+# type to support client-to-server streams.
+#sub stream_push {
+# my $self = shift;
+# my $msg = shift;
+# $self->respond( $msg );
+#}
sub respond {
my $self = shift;
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
+ my $type = 'RESULT';
my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+ if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
$response = $msg;
+ $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+
} else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
- $self->session->send('RESULT', $response, $self->threadTrace);
+ if ($self->max_chunk_size > 0) { # we might need to chunk
+ my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+
+ # XML can add a lot of length to a chunk due to escaping, so we
+ # calculate chunk size based on an XML-escaped version of the message.
+ # Example: If escaping doubles the length of the string then $ratio
+ # will be 0.5 and we'll cut the chunk size for this message in half.
+
+ my $raw_length = length(Encode::encode_utf8($str)); # count bytes
+ my $escaped_length = $raw_length;
+ $escaped_length += 11 * (() = ( $str =~ /"/g)); # 7 \s and "
+ $escaped_length += 4 * (() = ( $str =~ /&/g)); # &
+ $escaped_length += 3 * (() = ( $str =~ /[<>]/g)); # < / >
+
+ my $chunk_size = $self->max_chunk_size;
+
+ if ($escaped_length > $self->max_chunk_size) {
+ $chunk_size = POSIX::floor(($raw_length / $escaped_length) * $self->max_chunk_size);
+ }
+
+ if ($raw_length > $chunk_size) { # send partials ("chunking")
+ my $num_bytes = length(Encode::encode_utf8($str));
+ for (my $i = 0; $i < $num_bytes; $i += $chunk_size) {
+ $response = new OpenSRF::DomainObject::oilsResult::Partial;
+ $response->content( substr($str, $i, $chunk_size) );
+ $self->session->send($type, $response, $self->threadTrace);
+ }
+ # This triggers reconstruction on the remote end
+ $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+ return $self->session->send($type, $response, $self->threadTrace);
+ }
+ }
+
+ # message failed to exceed max chunk size OR chunking disabled
+ $response = new OpenSRF::DomainObject::oilsResult;
+ $response->content($msg);
+ }
+
+ if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
+
+ $self->{current_bundle_size} += length(
+ Encode::encode_utf8(OpenSRF::Utils::JSON->perl2JSON($response)));
+ push @{$self->{current_bundle}}, $type, $response;
+ $self->{current_bundle_count}++;
+
+ if ( $self->completing ||
+ ($self->{max_bundle_size} && $self->{current_bundle_size} >= $self->{max_bundle_size} ) ||
+ ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
+ ) { # send chunk and reset
+ my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+ $self->{current_bundle} = [];
+ $self->{current_bundle_size} = 0;
+ $self->{current_bundle_count} = 0;
+ return $send_res;
+ } else { # not at a chunk yet, just queue it up
+ return $self->session->app_request( $self->threadTrace );
+ }
+ }
+
+ $self->session->send($type, $response, $self->threadTrace);
}
sub respond_complete {
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
- my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
- $response = $msg;
- } else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
-
- my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
- statusCode => STATUS_COMPLETE(),
- status => 'Request Complete' );
+ $self->respond($msg) if (defined($msg));
-
- $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace);
+ $self->completing(1);
+ $self->respond(
+ OpenSRF::DomainObject::oilsConnectStatus->new(
+ statusCode => STATUS_COMPLETE(),
+ status => 'Request Complete'
+ )
+ );
$self->complete(1);
}
package OpenSRF::AppSubrequest;
+use base 'OpenSRF::AppRequest';
sub respond {
my $self = shift;
+ return if $self->complete;
+
my $resp = shift;
+ return $self->SUPER::respond($resp) if $self->respond_directly;
+
push @{$$self{resp}}, $resp if (defined $resp);
}
-sub respond_complete { respond(@_); }
+
+sub respond_complete {
+ my $self = shift;
+ return $self->SUPER::respond_complete(@_) if $self->respond_directly;
+ $self->respond(@_);
+ $self->complete(1);
+}
sub new {
- my $class = shift;
- $class = ref($class) || $class;
- return bless({resp => [], @_}, $class);
+ my $class = shift;
+ $class = ref($class) || $class;
+ my $self = bless({
+ complete => 0,
+ respond_directly=> 0, # use the passed session directly (RD mode)
+ resp => [],
+ threadTrace => 0, # needed for respond in RD mode
+ max_chunk_count => 0, # needed for respond in RD mode
+ max_chunk_size => 0, # needed for respond in RD mode
+ max_bundle_size => 0,
+ current_bundle => [], # needed for respond_complete in RD mode
+ current_bundle_count=> 0,
+ current_bundle_size => 0,
+ max_bundle_count => 0,
+ @_
+ }, $class);
+ if ($self->session) {
+ # steal the thread trace from the parent session for RD mode
+ $self->{threadTrace} = $self->session->session_threadTrace || $self->session->last_threadTrace;
+ }
+ return $self;
}
sub responses { @{$_[0]->{resp}} }
+sub respond_directly {
+ my $x = shift;
+ my $s = shift;
+ $x->{respond_directly} = $s if (defined $s);
+ return $x->session && $x->{respond_directly};
+}
+
sub session {
my $x = shift;
my $s = shift;
return $x->{session};
}
+sub complete {
+ my $x = shift;
+ my $c = shift;
+ $x->{complete} = $c if ($c);
+ $x->completing(0) if ($c);
+ return $x->{complete};
+}
+
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub status {}