my $logger = "OpenSRF::Utils::Logger";
my $_last_locale = 'en-US';
+our $current_ingress = 'opensrf';
+
+# Get/set the locale used by all new client sessions
+# for the current process. This is primarily useful
+# for clients that wish to make a series of opensrf
+# calls and don't wish to set the locale for each new
+# AppSession object.
+#
+# The caller should reset the locale when done using
+# reset_locale(), as the locale will otherwise persist
+# for the current process until set/reset again.
+#
+# This is not for SERVER processes, since they
+# adopt the locale of their respective callers.
+sub default_locale {
+ my ($class, $locale) = @_;
+ $_last_locale = $locale if $locale;
+ return $_last_locale;
+}
+sub reset_locale {
+ my ($class) = @_;
+ return $_last_locale = 'en-US';
+}
+
+sub ingress {
+ my ($class, $ingress) = @_;
+ $current_ingress = $ingress if $ingress;
+ return $current_ingress;
+}
our %_CACHE;
our @_RESEND_QUEUE;
my $locale = $self->session_locale;
$msg->sender_locale($locale) if ($locale);
+
+ $msg->sender_ingress($current_ingress);
push @doc, $msg;
threadTrace => $threadTrace,
payload => $payload,
complete => 0,
+ resp_count => 0,
+ max_bundle_count => 0,
+ current_bundle_count=> 0,
+ max_chunk_size => 0,
+ max_bundle_size => 0,
+ current_bundle_size => 0,
+ current_bundle => [],
timeout_reset => 0,
recv_timeout => 30,
remaining_recv_timeout => 30,
recv_queue => [],
+ part_recv_buffer=> '',
};
bless $self => $class;
return $self;
}
+sub max_bundle_count {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_bundle_count} = $value if (defined($value));
+ return $self->{max_bundle_count};
+}
+
+sub max_bundle_size {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_bundle_size} = $value if (defined($value));
+ return $self->{max_bundle_size};
+}
+
+sub max_chunk_size {
+ my $self = shift;
+ my $value = shift;
+ $self->{max_chunk_size} = $value if (defined($value));
+ return $self->{max_chunk_size};
+}
+
sub recv_timeout {
my $self = shift;
my $timeout = shift;
} else {
$self->session->queue_wait(0);
}
+ $self->completing(0) if ($self->{complete});
return $self->{complete};
}
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub duration {
my $self = shift;
$self->wait_complete;
$self->complete(1);
#return; eventually...
}
+
+ if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+ $self->{part_recv_buffer} .= $resp->content;
+ return 1;
+ } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+ if ($self->{part_recv_buffer}) {
+ $resp = new OpenSRF::DomainObject::oilsResult;
+ $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+ $self->{part_recv_buffer} = '';
+ }
+ }
+
push @{ $self->{recv_queue} }, $resp;
}
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
+ my $type = 'RESULT';
my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+ if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
$response = $msg;
+ $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+
} else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
- $self->session->send('RESULT', $response, $self->threadTrace);
+ if ($self->max_chunk_size > 0) { # we might need to chunk
+ my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+
+ # XML can add a lot of length to a chunk due to escaping, so we
+ # calculate chunk size based on an XML-escaped version of the message.
+ # Example: If escaping doubles the length of the string then $ratio
+ # will be 0.5 and we'll cut the chunk size for this message in half.
+
+ my $raw_length = length($str);
+ my $escaped_length = $raw_length;
+ $escaped_length += 11 * (() = ( $str =~ /"/g)); # 7 \s and "
+ $escaped_length += 4 * (() = ( $str =~ /&/g)); # &
+ $escaped_length += 3 * (() = ( $str =~ /[<>]/g)); # < / >
+
+ my $chunk_size = $self->max_chunk_size;
+
+ if ($escaped_length > $self->max_chunk_size) {
+ $chunk_size = ($raw_length / $escaped_length) * $self->max_chunk_size;
+ }
+
+ if ($raw_length > $chunk_size) { # send partials ("chunking")
+ for (my $i = 0; $i < length($str); $i += $chunk_size) {
+ $response = new OpenSRF::DomainObject::oilsResult::Partial;
+ $response->content( substr($str, $i, $chunk_size) );
+ $self->session->send($type, $response, $self->threadTrace);
+ }
+ # This triggers reconstruction on the remote end
+ $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+ return $self->session->send($type, $response, $self->threadTrace);
+ }
+ }
+
+ # message failed to exceed max chunk size OR chunking disabled
+ $response = new OpenSRF::DomainObject::oilsResult;
+ $response->content($msg);
+ }
+
+ if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
+
+ $self->{current_bundle_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
+ push @{$self->{current_bundle}}, $type, $response;
+ $self->{current_bundle_count}++;
+
+ if ( $self->completing ||
+ ($self->{max_bundle_size} && $self->{current_bundle_size} >= $self->{max_bundle_size} ) ||
+ ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
+ ) { # send chunk and reset
+ my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+ $self->{current_bundle} = [];
+ $self->{current_bundle_size} = 0;
+ $self->{current_bundle_count} = 0;
+ return $send_res;
+ } else { # not at a chunk yet, just queue it up
+ return $self->session->app_request( $self->threadTrace );
+ }
+ }
+
+ $self->session->send($type, $response, $self->threadTrace);
}
sub respond_complete {
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
- my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
- $response = $msg;
- } else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
-
- my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
- statusCode => STATUS_COMPLETE(),
- status => 'Request Complete' );
+ $self->respond($msg) if (defined($msg));
-
- $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace);
+ $self->completing(1);
+ $self->respond(
+ OpenSRF::DomainObject::oilsConnectStatus->new(
+ statusCode => STATUS_COMPLETE(),
+ status => 'Request Complete'
+ )
+ );
$self->complete(1);
}
package OpenSRF::AppSubrequest;
+use base 'OpenSRF::AppRequest';
sub respond {
my $self = shift;
+ return if $self->complete;
+
my $resp = shift;
+ return $self->SUPER::respond($resp) if $self->respond_directly;
+
push @{$$self{resp}}, $resp if (defined $resp);
}
-sub respond_complete { respond(@_); }
+
+sub respond_complete {
+ my $self = shift;
+ return $self->SUPER::respond_complete(@_) if $self->respond_directly;
+ $self->respond(@_);
+ $self->complete(1);
+}
sub new {
- my $class = shift;
- $class = ref($class) || $class;
- return bless({resp => [], @_}, $class);
+ my $class = shift;
+ $class = ref($class) || $class;
+ my $self = bless({
+ complete => 0,
+ respond_directly=> 0, # use the passed session directly (RD mode)
+ resp => [],
+ threadTrace => 0, # needed for respond in RD mode
+ max_chunk_count => 0, # needed for respond in RD mode
+ max_chunk_size => 0, # needed for respond in RD mode
+ max_bundle_size => 0,
+ current_bundle => [], # needed for respond_complete in RD mode
+ current_bundle_count=> 0,
+ current_bundle_size => 0,
+ max_bundle_count => 0,
+ @_
+ }, $class);
+ if ($self->session) {
+ # steal the thread trace from the parent session for RD mode
+ $self->{threadTrace} = $self->session->session_threadTrace || $self->session->last_threadTrace;
+ }
+ return $self;
}
sub responses { @{$_[0]->{resp}} }
+sub respond_directly {
+ my $x = shift;
+ my $s = shift;
+ $x->{respond_directly} = $s if (defined $s);
+ return $x->session && $x->{respond_directly};
+}
+
sub session {
my $x = shift;
my $s = shift;
return $x->{session};
}
+sub complete {
+ my $x = shift;
+ my $c = shift;
+ $x->{complete} = $c if ($c);
+ $x->completing(0) if ($c);
+ return $x->{complete};
+}
+
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub status {}