payload => $payload,
complete => 0,
resp_count => 0,
- max_chunk_count => 0,
- current_chunk_count => 0,
+ max_bundle_count => 0,
+ current_bundle_count=> 0,
max_chunk_size => 0,
- current_chunk_size => 0,
- current_chunk => [],
+ max_bundle_size => 0,
+ current_bundle_size => 0,
+ current_bundle => [],
timeout_reset => 0,
recv_timeout => 30,
remaining_recv_timeout => 30,
recv_queue => [],
+ part_recv_buffer=> '',
};
bless $self => $class;
return $self;
}
-sub max_chunk_count {
+sub max_bundle_count {
my $self = shift;
my $value = shift;
- $self->{max_chunk_count} = $value if (defined($value));
- return $self->{max_chunk_count};
+ $self->{max_bundle_count} = $value if (defined($value));
+ return $self->{max_bundle_count};
}
-sub max_chunk_size {
+sub max_bundle_size {
my $self = shift;
my $value = shift;
- $self->{max_chunk_size} = $value if (defined($value));
- return $self->{max_chunk_size};
+ $self->{max_bundle_size} = $value if (defined($value));
+ return $self->{max_bundle_size};
}
sub recv_timeout {
} else {
$self->session->queue_wait(0);
}
+ $self->completing(0) if ($self->{complete});
return $self->{complete};
}
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub duration {
my $self = shift;
$self->wait_complete;
$self->complete(1);
#return; eventually...
}
+
+ if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+ $self->{part_recv_buffer} .= $resp->content;
+ return 1;
+ } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+ if ($self->{part_recv_buffer}) {
+ $resp = new OpenSRF::DomainObject::oilsResult;
+ $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+ $self->{part_recv_buffer} = '';
+ }
+ }
+
push @{ $self->{recv_queue} }, $resp;
}
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
+
+ my $type = 'RESULT';
my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+ if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
$response = $msg;
- } else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
+ $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+ } elsif ($self->max_chunk_size > 0) { # we might need to chunk
+ my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+ if (length($str) > $self->max_chunk_size) { # send partials ("chunking")
+ for (my $i = 0; $i < length($str); $i += $self->max_chunk_size) {
+ $response = new OpenSRF::DomainObject::oilsResult::Partial;
+ $response->content( substr($str, $i, $self->max_chunk_size) );
+ $self->session->send($type, $response, $self->threadTrace);
+ }
+ # This triggers reconstruction on the remote end
+ $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+ return $self->session->send($type, $response, $self->threadTrace);
+ } else {
+ $response = new OpenSRF::DomainObject::oilsResult;
+ $response->content( $msg );
+ }
+ }
- if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count
+ if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
- $self->{current_chunk_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
- push @{$self->{current_chunk}}, $response;
- $self->{current_chunk_count}++;
+ $self->{current_bundle_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
+ push @{$self->{current_bundle}}, $type, $response;
+ $self->{current_bundle_count}++;
- if (
- ($self->{max_chunk_size} && $self->{current_chunk_size} >= $self->{max_chunk_size} ) ||
- ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count})
+ if ( $self->completing ||
+ ($self->{max_bundle_size} && $self->{current_bundle_size} >= $self->{max_bundle_size} ) ||
+ ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
) { # send chunk and reset
- my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace);
- $self->{current_chunk} = [];
- $self->{current_chunk_size} = 0;
- $self->{current_chunk_count} = 0;
+ my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+ $self->{current_bundle} = [];
+ $self->{current_bundle_size} = 0;
+ $self->{current_bundle_count} = 0;
return $send_res;
} else { # not at a chunk yet, just queue it up
return $self->session->app_request( $self->threadTrace );
}
}
- $self->session->send('RESULT', $response, $self->threadTrace);
+ $self->session->send($type, $response, $self->threadTrace);
}
sub respond_complete {
my $msg = shift;
return unless ($self and $self->session and !$self->complete);
- if (defined($msg)) {
- my $response;
- if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
- $response = $msg;
- } else {
- $response = new OpenSRF::DomainObject::oilsResult;
- $response->content($msg);
- }
-
- push @{$self->{current_chunk}}, $response;
- }
-
- my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
- statusCode => STATUS_COMPLETE(),
- status => 'Request Complete' );
+ $self->respond($msg) if (defined($msg));
- $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace);
+ $self->completing(1);
+ $self->respond(
+ OpenSRF::DomainObject::oilsConnectStatus->new(
+ statusCode => STATUS_COMPLETE(),
+ status => 'Request Complete'
+ )
+ );
$self->complete(1);
}
my $x = shift;
my $c = shift;
$x->{complete} = $c if ($c);
+ $x->completing(0) if ($c);
return $x->{complete};
}
+sub completing {
+ my $self = shift;
+ my $value = shift;
+ $self->{_completing} = $value if (defined($value));
+ return $self->{_completing};
+}
+
sub status {}
sub STATUS_CONTINUE { return 100 }
-sub STATUS_OK { return 200 }
+sub STATUS_OK { return 200 }
sub STATUS_ACCEPTED { return 202 }
sub STATUS_COMPLETE { return 205 }
+sub STATUS_PARTIAL { return 206 }
+sub STATUS_NOCONTENT { return 204 }
+
sub STATUS_REDIRECTED { return 307 }
sub STATUS_BADREQUEST { return 400 }
#-------------------------------------------------------------------------------
+package OpenSRF::DomainObject::oilsResult::Partial;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response';
+$statusCode = STATUS_PARTIAL;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to break apart large OpenSRF messages into small
+chunks, to reduce the maximum possible stanza size when sending a message over
+XMPP.
+
+=cut
+
+sub content {
+ my $self = shift;
+ my $val = shift;
+
+ $self->{content} = $val if (defined $val);
+ return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
+package OpenSRF::DomainObject::oilsResult::PartialComplete;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response Finalized';
+$statusCode = STATUS_NOCONTENT;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to mark the end of a stream of small partial
+OpenSRF messages of type OpenSRF::DomainObject::oilsResult::Partial.
+
+=cut
+
+sub content {
+ my $self = shift;
+ my $val = shift;
+
+ $self->{content} = $val if (defined $val);
+ return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
package OpenSRF::DomainObject::oilsException;
use OpenSRF::DomainObject::oilsResponse qw/:status/;
use OpenSRF::EX;