LP#1612771: bundling and chunking
authorMike Rylander <mrylander@gmail.com>
Sun, 23 Feb 2014 19:51:13 +0000 (14:51 -0500)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 1 Nov 2016 21:13:26 +0000 (17:13 -0400)
This patch is first in a series of patches that provides the following
features:

* OpenSRF message bundling -- Pack multiple OpenSRF messages together
in a single XMPP envelope, as long as we believe more messages will be
sent in the future and we are below some threshold of combined message size.
The default for that threshold is 25Kb.

 * OpenSRF message chunking -- Break up large OpenSRF messages across
multiple XMPP envelopes. This is implemented with a new OpenSRF message type.
C, Perl, and Javascript libraries are taught how to reconstruct chunked
messages. The default chunking threshold is 50Kb, just a bit below the default
ejabberd max stanza size of 64Kb.

This patch in particular renames "chunking" to "bundling", then
implements message splitting ("chunking") in Perl using two new
oilsResult subclasses

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>

src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/Application.pm
src/perl/lib/OpenSRF/DomainObject/oilsResponse.pm

index 9e371c7..7454970 100644 (file)
@@ -858,15 +858,17 @@ sub new {
                        payload                 => $payload,
                        complete                => 0,
                        resp_count              => 0,
-                       max_chunk_count         => 0,
-                       current_chunk_count     => 0,
+                       max_bundle_count        => 0,
+                       current_bundle_count=> 0,
                        max_chunk_size          => 0,
-                       current_chunk_size      => 0,
-                       current_chunk           => [],
+                       max_bundle_size         => 0,
+                       current_bundle_size     => 0,
+                       current_bundle          => [],
                        timeout_reset           => 0,
                        recv_timeout            => 30,
                        remaining_recv_timeout  => 30,
                        recv_queue              => [],
+                       part_recv_buffer=> '',
        };
 
        bless $self => $class;
@@ -876,18 +878,18 @@ sub new {
        return $self;
 }
 
-sub max_chunk_count {
+sub max_bundle_count {
        my $self = shift;
        my $value = shift;
-       $self->{max_chunk_count} = $value if (defined($value));
-       return $self->{max_chunk_count};
+       $self->{max_bundle_count} = $value if (defined($value));
+       return $self->{max_bundle_count};
 }
 
-sub max_chunk_size {
+sub max_bundle_size {
        my $self = shift;
        my $value = shift;
-       $self->{max_chunk_size} = $value if (defined($value));
-       return $self->{max_chunk_size};
+       $self->{max_bundle_size} = $value if (defined($value));
+       return $self->{max_bundle_size};
 }
 
 sub recv_timeout {
@@ -932,9 +934,17 @@ sub complete {
        } else {
                $self->session->queue_wait(0);
        }
+    $self->completing(0) if ($self->{complete});
        return $self->{complete};
 }
 
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub duration {
        my $self = shift;
        $self->wait_complete;
@@ -969,6 +979,18 @@ sub push_queue {
                $self->complete(1);
                #return; eventually...
        }
+
+       if( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::Partial")) {
+               $self->{part_recv_buffer} .= $resp->content;
+               return 1;
+       } elsif( UNIVERSAL::isa($resp, "OpenSRF::DomainObject::oilsResult::PartialComplete")) {
+               if ($self->{part_recv_buffer}) {
+                       $resp = new OpenSRF::DomainObject::oilsResult;
+                       $resp->content( OpenSRF::Utils::JSON->JSON2perl( $self->{part_recv_buffer} ) );
+                       $self->{part_recv_buffer} = '';
+               } 
+       }
+
        push @{ $self->{recv_queue} }, $resp;
 }
 
@@ -1012,35 +1034,50 @@ sub respond {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
+
+    my $type = 'RESULT';
        my $response;
-       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
+       if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResponse')) {
                $response = $msg;
-       } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-               $response->content($msg);
-       }
+        $type = 'STATUS' if UNIVERSAL::isa($response, 'OpenSRF::DomainObject::oilsStatus');
+       } elsif ($self->max_chunk_size > 0) { # we might need to chunk
+        my $str = OpenSRF::Utils::JSON->perl2JSON($msg);
+        if (length($str) > $self->max_chunk_size) { # send partials ("chunking")
+            for (my $i = 0; $i < length($str); $i += $self->max_chunk_size) {
+                $response = new OpenSRF::DomainObject::oilsResult::Partial;
+                       $response->content( substr($str, $i, $self->max_chunk_size) );
+                   $self->session->send($type, $response, $self->threadTrace);
+            }
+            # This triggers reconstruction on the remote end
+            $response = new OpenSRF::DomainObject::oilsResult::PartialComplete;
+               return $self->session->send($type, $response, $self->threadTrace);
+        } else {
+            $response = new OpenSRF::DomainObject::oilsResult;
+            $response->content( $msg );
+        }
+    }
 
-    if ($self->{max_chunk_count} > 0 or $self->{max_chunk_size} > 0) { # we are chunking, and we need to test the size or count
+    if ($self->{max_bundle_count} > 0 or $self->{max_bundle_size} > 0) { # we are bundling, and we need to test the size or count
 
-        $self->{current_chunk_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
-        push @{$self->{current_chunk}}, $response;  
-        $self->{current_chunk_count}++;
+        $self->{current_bundle_size} += length(OpenSRF::Utils::JSON->perl2JSON($response));
+        push @{$self->{current_bundle}}, $type, $response;  
+        $self->{current_bundle_count}++;
 
-        if (
-                ($self->{max_chunk_size}  && $self->{current_chunk_size}  >= $self->{max_chunk_size} ) ||
-                ($self->{max_chunk_count} && $self->{current_chunk_count} >= $self->{max_chunk_count})
+        if ( $self->completing ||
+                ($self->{max_bundle_size}  && $self->{current_bundle_size}  >= $self->{max_bundle_size} ) ||
+                ($self->{max_bundle_count} && $self->{current_bundle_count} >= $self->{max_bundle_count})
         ) { # send chunk and reset
-            my $send_res = $self->session->send(( map { ('RESULT', $_) } @{$self->{current_chunk}} ), $self->threadTrace);
-            $self->{current_chunk} = [];
-            $self->{current_chunk_size} = 0;
-            $self->{current_chunk_count} = 0;
+            my $send_res = $self->session->send( @{$self->{current_bundle}}, $self->threadTrace);
+            $self->{current_bundle} = [];
+            $self->{current_bundle_size} = 0;
+            $self->{current_bundle_count} = 0;
             return $send_res;
         } else { # not at a chunk yet, just queue it up
             return $self->session->app_request( $self->threadTrace );
         }
     }
 
-       $self->session->send('RESULT', $response, $self->threadTrace);
+       $self->session->send($type, $response, $self->threadTrace);
 }
 
 sub respond_complete {
@@ -1048,23 +1085,15 @@ sub respond_complete {
        my $msg = shift;
        return unless ($self and $self->session and !$self->complete);
 
-    if (defined($msg)) {
-       my $response;
-           if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) {
-                   $response = $msg;
-       } else {
-               $response = new OpenSRF::DomainObject::oilsResult;
-                   $response->content($msg);
-       }
-
-        push @{$self->{current_chunk}}, $response;
-    }
-
-       my $stat = OpenSRF::DomainObject::oilsConnectStatus->new(
-               statusCode => STATUS_COMPLETE(),
-               status => 'Request Complete' );
+    $self->respond($msg) if (defined($msg));
 
-       $self->session->send( ( map { ('RESULT', $_) } @{$self->{current_chunk}} ), 'STATUS' => $stat, $self->threadTrace);
+    $self->completing(1);
+    $self->respond(
+        OpenSRF::DomainObject::oilsConnectStatus->new(
+            statusCode => STATUS_COMPLETE(),
+            status => 'Request Complete'
+        )
+    );
        $self->complete(1);
 }
 
@@ -1129,9 +1158,17 @@ sub complete {
        my $x = shift;
        my $c = shift;
        $x->{complete} = $c if ($c);
+    $x->completing(0) if ($c);
        return $x->{complete};
 }
 
+sub completing {
+       my $self = shift;
+       my $value = shift;
+       $self->{_completing} = $value if (defined($value));
+       return $self->{_completing};
+}
+
 sub status {}
 
 
index 023bb8d..9749a1d 100644 (file)
@@ -48,17 +48,24 @@ sub argc {
        return $self->{argc};
 }
 
-sub max_chunk_size {
+sub max_bundle_size {
        my $self = shift;
        return 0 unless ref($self);
-       return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
+       return $self->{max_bundle_size} if (defined($self->{max_bundle_size}));
        return 10240;
 }
 
-sub max_chunk_count {
+sub max_bundle_count {
+       my $self = shift;
+       return 0 unless ref($self);
+       return $self->{max_bundle_count} || 0;
+}
+
+sub max_chunk_size {
        my $self = shift;
        return 0 unless ref($self);
-       return $self->{max_chunk_count} || 0;
+       return $self->{max_chunk_size} if (defined($self->{max_chunk_size}));
+       return 2 * $self->max_bundle_size;
 }
 
 sub api_name {
@@ -173,8 +180,8 @@ sub handler {
                        my @args = $app_msg->params;
                        $coderef->session( $session );
                        my $appreq = OpenSRF::AppRequest->new( $session );
-                       $appreq->max_chunk_size( $coderef->max_chunk_size );
-                       $appreq->max_chunk_count( $coderef->max_chunk_count );
+                       $appreq->max_bundle_size( $coderef->max_bundle_size );
+                       $appreq->max_bundle_count( $coderef->max_bundle_count );
 
                        $log->debug( "in_request = $in_request : [" . $appreq->threadTrace."]", INTERNAL );
                        if( $in_request ) {
index 18e2e3c..25d8f50 100644 (file)
@@ -51,10 +51,13 @@ layer messages send between the client and server.
 
 sub STATUS_CONTINUE            { return 100 }
 
-sub STATUS_OK                          { return 200 }
+sub STATUS_OK                  { return 200 }
 sub STATUS_ACCEPTED            { return 202 }
 sub STATUS_COMPLETE            { return 205 }
 
+sub STATUS_PARTIAL             { return 206 }
+sub STATUS_NOCONTENT   { return 204 }
+
 sub STATUS_REDIRECTED  { return 307 }
 
 sub STATUS_BADREQUEST  { return 400 }
@@ -277,6 +280,85 @@ B<OpenSRF::DomainObject::oilsResponse>
 
 #-------------------------------------------------------------------------------
 
+package OpenSRF::DomainObject::oilsResult::Partial;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response';
+$statusCode = STATUS_PARTIAL;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to break apart large OpenSRF messages into small
+chunks, to reduce the maximum possible stanza size when sending a message over
+XMPP.
+
+=cut
+
+sub content {
+        my $self = shift;
+       my $val = shift;
+
+       $self->{content} = $val if (defined $val);
+       return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
+package OpenSRF::DomainObject::oilsResult::PartialComplete;
+use OpenSRF::DomainObject::oilsResponse qw/:status/;
+use base 'OpenSRF::DomainObject::oilsResult';
+use vars qw/$status $statusCode/;
+OpenSRF::Utils::JSON->register_class_hint( hint => 'osrfResult', name => 'OpenSRF::DomainObject::oilsResult::Partial', type => 'hash' );
+
+
+$status = 'Partial Response Finalized';
+$statusCode = STATUS_NOCONTENT;
+
+=head1 NAME
+
+OpenSRF::DomainObject::oilsResult::Partial
+
+=head1 SYNOPSIS
+
+This class is used internally to mark the end of a stream of small partial
+OpenSRF messages of type OpenSRF::DomainObject::oilsResult::Partial.
+
+=cut
+
+sub content {
+        my $self = shift;
+       my $val = shift;
+
+       $self->{content} = $val if (defined $val);
+       return $self->{content};
+}
+
+=head1 SEE ALSO
+
+B<OpenSRF::DomainObject::oilsResponse>
+
+=cut
+
+1;
+
+#-------------------------------------------------------------------------------
+
 package OpenSRF::DomainObject::oilsException;
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::EX;