Bug 19893: Alternative optimized indexing for Elasticsearch
authorDavid Gustafsson <david.gustafsson@ub.gu.se>
Tue, 12 Dec 2017 17:26:13 +0000 (18:26 +0100)
committerNick Clemens <nick@bywatersolutions.com>
Fri, 16 Nov 2018 11:04:56 +0000 (11:04 +0000)
Implement optimized indexing for Elasticsearch

How to test:
1) Time a full elasticsearch re-index without this patch by running the
   rebuild_elastic_search.pl with the -d flag:
   `koha-shell <instance_name> -c "time rebuild_elastic_search.pl -d"`.
2) Apply this patch.
3) Time a full re-index again, it should be about twice at fast (for a
   couple of thousand biblios, with fewer biblios results may be more
   unpredictable).

Sponsored-by: Gothenburg University Library
Signed-off-by: Ere Maijala <ere.maijala@helsinki.fi>
Signed-off-by: Martin Renvoize <martin.renvoize@ptfs-europe.com>

Signed-off-by: Nick Clemens <nick@bywatersolutions.com>

Koha/SearchEngine/Elasticsearch.pm
Koha/SearchEngine/Elasticsearch/Indexer.pm
Koha/SearchEngine/Elasticsearch/Search.pm
admin/searchengine/elasticsearch/field_config.yaml
koha-tmpl/intranet-tmpl/prog/en/modules/admin/preferences/admin.pref
misc/search_tools/rebuild_elastic_search.pl
t/Koha/SearchEngine/Elasticsearch.t
t/db_dependent/Koha_Elasticsearch.t [deleted file]
t/db_dependent/Koha_Elasticsearch_Indexer.t

index 9c114fb..93d9213 100644 (file)
@@ -34,6 +34,12 @@ use Search::Elasticsearch;
 use Try::Tiny;
 use YAML::Syck;
 
+use List::Util qw( sum0 reduce );
+use Search::Elasticsearch;
+use MARC::File::XML;
+use MIME::Base64;
+use Encode qw(encode);
+
 __PACKAGE__->mk_ro_accessors(qw( index ));
 __PACKAGE__->mk_accessors(qw( sort_fields ));
 
@@ -67,6 +73,19 @@ sub new {
     return $self;
 }
 
+sub get_elasticsearch {
+    my $self = shift @_;
+    unless (defined $self->{elasticsearch}) {
+        my $conf = $self->get_elasticsearch_params();
+        $self->{elasticsearch} = Search::Elasticsearch->new(
+            client => "5_0::Direct",
+            nodes => $conf->{nodes},
+            cxn_pool => 'Sniff'
+        );
+    }
+    return $self->{elasticsearch};
+}
+
 =head2 get_elasticsearch_params
 
     my $params = $self->get_elasticsearch_params();
@@ -281,48 +300,205 @@ sub sort_fields {
     return $self->_sort_fields_accessor();
 }
 
-# Provides the rules for data conversion.
-sub get_fixer_rules {
+sub marc_records_to_documents {
+    my ($self, $records) = @_;
+    my $rules = $self->get_marc_mapping_rules();
+    my $control_fields_rules = $rules->{control_fields};
+    my $data_fields_rules = $rules->{data_fields};
+    my $marcflavour = lc C4::Context->preference('marcflavour');
+    my $serialization_format = C4::Context->preference('ElasticsearchMARCSerializationFormat');
+
+    my @record_documents;
+
+    sub _process_mappings {
+        my ($mappings, $data, $record_document) = @_;
+        foreach my $mapping (@{$mappings}) {
+            my ($target, $options) = @{$mapping};
+            # Copy (scalar) data since can have multiple targets
+            # with differing options for (possibly) mutating data
+            # so need a different copy for each
+            my $_data = $data;
+            $record_document->{$target} //= [];
+            if (defined $options->{substr}) {
+                my ($start, $length) = @{$options->{substr}};
+                $_data = length($data) > $start ? substr $data, $start, $length : '';
+            }
+            if (defined $options->{value_callbacks}) {
+                $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
+            }
+            if (defined $options->{property}) {
+                $_data = {
+                    $options->{property} => $_data
+                }
+            }
+            push @{$record_document->{$target}}, $_data;
+        }
+    }
+    foreach my $record (@{$records}) {
+        my $record_document = {};
+        my $mappings = $rules->{leader};
+        if ($mappings) {
+            _process_mappings($mappings, $record->leader(), $record_document);
+        }
+        foreach my $field ($record->fields()) {
+            if($field->is_control_field()) {
+                my $mappings = $control_fields_rules->{$field->tag()};
+                if ($mappings) {
+                    _process_mappings($mappings, $field->data(), $record_document);
+                }
+            }
+            else {
+                my $subfields_mappings = $data_fields_rules->{$field->tag()};
+                if ($subfields_mappings) {
+                    my $wildcard_mappings = $subfields_mappings->{'*'};
+                    foreach my $subfield ($field->subfields()) {
+                        my ($code, $data) = @{$subfield};
+                        my $mappings = $subfields_mappings->{$code} // [];
+                        if ($wildcard_mappings) {
+                            $mappings = [@{$mappings}, @{$wildcard_mappings}];
+                        }
+                        if (@{$mappings}) {
+                            _process_mappings($mappings, $data, $record_document);
+                        }
+                    }
+                }
+            }
+        }
+        foreach my $field (keys %{$rules->{defaults}}) {
+            unless (defined $record_document->{$field}) {
+                $record_document->{$field} = $rules->{defaults}->{$field};
+            }
+        }
+        foreach my $field (@{$rules->{sum}}) {
+            if (defined $record_document->{$field}) {
+                # TODO: validate numeric? filter?
+                # TODO: Or should only accept fields without nested values?
+                # TODO: Quick and dirty, improve if needed
+                $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
+            }
+        }
+        # TODO: Perhaps should check if $records_document non empty, but really should never be the case
+        $record->encoding('UTF-8');
+        if ($serialization_format eq 'base64ISO2709') {
+            $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
+        }
+        else {
+            $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
+        }
+        my $id = $record->subfield('999', 'c');
+        push @record_documents, [$id, $record_document];
+    }
+    return \@record_documents;
+}
+
+# Provides the rules for marc to Elasticsearch JSON document conversion.
+sub get_marc_mapping_rules {
     my ($self) = @_;
 
     my $marcflavour = lc C4::Context->preference('marcflavour');
     my @rules;
 
-    $self->_foreach_mapping(
-        sub {
-            my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
-            return if $marc_type ne $marcflavour;
-            my $options ='';
+    sub _field_mappings {
+        my ($facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
+        my %mapping_defaults = ();
+        my @mappings;
+
+        my $substr_args = undef;
+        if ($range) {
+            # TODO: use value_callback instead?
+            my ($start, $end) = map(int, split /-/, $range, 2);
+            $substr_args = [$start];
+            push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
+        }
+        my $default_options = {};
+        if ($substr_args) {
+            $default_options->{substr} = $substr_args;
+        }
+
+        # TODO: Should probably have per type value callback/hook
+        # but hard code for now
+        if ($target_type eq 'boolean') {
+            $default_options->{value_callbacks} //= [];
+            push @{$default_options->{value_callbacks}}, sub {
+                my ($value) = @_;
+                # Trim whitespace at both ends
+                $value =~ s/^\s+|\s+$//g;
+                return $value ? 'true' : 'false';
+            };
+        }
 
-            push @rules, "marc_map('$marc_field','${name}.\$append', $options)";
-            if ($facet) {
-                push @rules, "marc_map('$marc_field','${name}__facet.\$append', $options)";
+        my $mapping = [$target_name, $default_options];
+        push @mappings, $mapping;
+
+        my @suffixes = ();
+        push @suffixes, 'facet' if $facet;
+        push @suffixes, 'suggestion' if $suggestible;
+        push @suffixes, 'sort' if !defined $sort || $sort;
+
+        foreach my $suffix (@suffixes) {
+            my $mapping = ["${target_name}__$suffix"];
+            # Hack, fix later in less hideous manner
+            if ($suffix eq 'suggestion') {
+                push @{$mapping}, {%{$default_options}, property => 'input'};
             }
-            if ($suggestible) {
-                push @rules,
-                    #"marc_map('$marc_field','${name}__suggestion.input.\$append', '')"; #must not have nested data structures in .input
-                    "marc_map('$marc_field','${name}__suggestion.input.\$append')";
+            else {
+                push @{$mapping}, $default_options;
             }
-            if ( $type eq 'boolean' ) {
+            push @mappings, $mapping;
+        }
+        return @mappings;
+    };
+    my $field_spec_regexp = qr/^([0-9]{3})([0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
+    my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
+    my $rules = {
+        'leader' => [],
+        'control_fields' => {},
+        'data_fields' => {},
+        'sum' => [],
+        'defaults' => {}
+    };
+
+    $self->_foreach_mapping(sub {
+        my ( $name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field ) = @_;
+        return if $marc_type ne $marcflavour;
+
+        if ($type eq 'sum') {
+            push @{$rules->{sum}}, $name;
+        }
+        elsif($type eq 'boolean') {
+            # boolean gets special handling, if value doesn't exist for a field,
+            # it is set to false
+            $rules->{defaults}->{$name} = 'false';
+        }
 
-                # boolean gets special handling, basically if it doesn't exist,
-                # it's added and set to false. Otherwise we can't query it.
-                push @rules,
-                  "unless exists('$name') add_field('$name', 0) end";
-            }
-            if ($type eq 'sum' ) {
-                push @rules, "sum('$name')";
+        if ($marc_field =~ $field_spec_regexp) {
+            my $field_tag = $1;
+            my $subfields = defined $2 ? $2 : '*';
+            my $range = defined $3 ? $3 : undef;
+            if ($field_tag < 10) {
+                $rules->{control_fields}->{$field_tag} //= [];
+                my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
+                push @{$rules->{control_fields}->{$field_tag}}, @mappings;
             }
-            if ($self->sort_fields()->{$name}) {
-                if ($sort || !defined $sort) {
-                    push @rules, "marc_map('$marc_field','${name}__sort.\$append', $options)";
+            else {
+                $rules->{data_fields}->{$field_tag} //= {};
+                foreach my $subfield (split //, $subfields) {
+                    $rules->{data_fields}->{$field_tag}->{$subfield} //= [];
+                    my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
+                    push @{$rules->{data_fields}->{$field_tag}->{$subfield}}, @mappings;
                 }
             }
         }
-    );
-
-    push @rules, "move_field(_id,es_id)"; #Also you must set the Catmandu::Store::ElasticSearch->new(key_prefix: 'es_');
-    return \@rules;
+        elsif ($marc_field =~ $leader_regexp) {
+            my $range = defined $1 ? $1 : undef;
+            my @mappings = _field_mappings($facet, $suggestible, $sort, $name, $type, $range);
+            push @{$rules->{leader}}, @mappings;
+        }
+        else {
+            die("Invalid marc field: $marc_field");
+        }
+    });
+    return $rules;
 }
 
 =head2 _foreach_mapping
index ad0e4ca..0b7605a 100644 (file)
@@ -66,24 +66,64 @@ sub update_index {
         $self->_sanitise_records($biblionums, $records);
     }
 
-    my $from    = $self->_convert_marc_to_json($records);
-    if ( !$self->store ) {
-        my $params  = $self->get_elasticsearch_params();
-        $self->store(
-            Catmandu::Store::ElasticSearch->new(
-                %$params,
-                index_settings => $self->get_elasticsearch_settings(),
-                index_mappings => $self->get_elasticsearch_mappings(),
-            )
+    $self->ensure_mappings_updated();
+    $self->bulk_index($records);
+    return 1;
+}
+
+sub bulk_index {
+    my ($self, $records) = @_;
+    my $conf = $self->get_elasticsearch_params();
+    my $elasticsearch = $self->get_elasticsearch();
+    my $documents = $self->marc_records_to_documents($records);
+    my @body;
+
+    foreach my $document_info (@{$documents}) {
+        my ($id, $document) = @{$document_info};
+        push @body, {
+            index => {
+                _id => $id
+            }
+        };
+        push @body, $document;
+    }
+    if (@body) {
+        my $response = $elasticsearch->bulk(
+            index => $conf->{index_name},
+            type => 'data', # is just hard coded in Indexer.pm?
+            body => \@body
         );
     }
-
-    #print Data::Dumper::Dumper( $from->to_array );
-    $self->store->bag->add_many($from);
-    $self->store->bag->commit;
+    # TODO: handle response
     return 1;
 }
 
+sub ensure_mappings_updated {
+    my ($self) = @_;
+    unless ($self->{_mappings_updated}) {
+        $self->update_mappings();
+    }
+}
+
+sub update_mappings {
+    my ($self) = @_;
+    my $conf = $self->get_elasticsearch_params();
+    my $elasticsearch = $self->get_elasticsearch();
+    my $mappings = $self->get_elasticsearch_mappings();
+
+    foreach my $type (keys %{$mappings}) {
+        my $response = $elasticsearch->indices->put_mapping(
+            index => $conf->{index_name},
+            type => $type,
+            body => {
+                $type => $mappings->{$type}
+            }
+        );
+        # TODO: process response, produce errors etc
+    }
+    $self->{_mappings_updated} = 1;
+}
+
 =head2 $indexer->update_index_background($biblionums, $records)
 
 This has exactly the same API as C<update_index_background> however it'll
@@ -139,28 +179,6 @@ sub delete_index_background {
     $self->delete_index(@_);
 }
 
-=head2 $indexer->create_index();
-
-Create an index on the Elasticsearch server.
-
-=cut
-
-sub create_index {
-    my ($self) = @_;
-
-    if (!$self->store) {
-        my $params  = $self->get_elasticsearch_params();
-        $self->store(
-            Catmandu::Store::ElasticSearch->new(
-                %$params,
-                index_settings => $self->get_elasticsearch_settings(),
-                index_mappings => $self->get_elasticsearch_mappings(),
-            )
-        );
-    }
-    $self->store->bag->commit;
-}
-
 =head2 $indexer->drop_index();
 
 Drops the index from the elasticsearch server. Calling C<update_index>
@@ -170,22 +188,35 @@ after this will recreate it again.
 
 sub drop_index {
     my ($self) = @_;
-
-    if (!$self->store) {
-        # If this index doesn't exist, this will create it. Then it'll be
-        # deleted. That's not the end of the world however.
-        my $params  = $self->get_elasticsearch_params();
-        $self->store(
-            Catmandu::Store::ElasticSearch->new(
-                %$params,
-                index_settings => $self->get_elasticsearch_settings(),
-                index_mappings => $self->get_elasticsearch_mappings(),
-            )
-        );
+    if ($self->index_exists) {
+        my $conf = $self->get_elasticsearch_params();
+        my $elasticsearch = $self->get_elasticsearch();
+        my $response = $elasticsearch->indices->delete(index => $conf->{index_name});
+        # TODO: Handle response? Convert errors to exceptions/die
     }
-    my $store = $self->store;
-    $self->store(undef);
-    $store->drop();
+}
+
+sub create_index {
+    my ($self) = @_;
+    my $conf = $self->get_elasticsearch_params();
+    my $settings = $self->get_elasticsearch_settings();
+    my $elasticsearch = $self->get_elasticsearch();
+    my $response = $elasticsearch->indices->create(
+        index => $conf->{index_name},
+        body => {
+            settings => $settings
+        }
+    );
+    # TODO: Handle response? Convert errors to exceptions/die
+}
+
+sub index_exists {
+    my ($self) = @_;
+    my $conf = $self->get_elasticsearch_params();
+    my $elasticsearch = $self->get_elasticsearch();
+    return $elasticsearch->indices->exists(
+        index => $conf->{index_name},
+    );
 }
 
 sub _sanitise_records {
@@ -209,16 +240,6 @@ sub _sanitise_records {
     }
 }
 
-sub _convert_marc_to_json {
-    my $self    = shift;
-    my $records = shift;
-    my $importer =
-      Catmandu::Importer::MARC->new( records => $records, id => '999c' );
-    my $fixer = Catmandu::Fix->new( fixes => $self->get_fixer_rules() );
-    $importer = $fixer->fix($importer);
-    return $importer;
-}
-
 1;
 
 __END__
index 6340ebe..3bd27ea 100644 (file)
@@ -49,9 +49,10 @@ use Koha::SearchEngine::QueryBuilder;
 use Koha::SearchEngine::Search;
 use MARC::Record;
 use Catmandu::Store::ElasticSearch;
-
+use MARC::File::XML;
 use Data::Dumper; #TODO remove
 use Carp qw(cluck);
+use MIME::Base64;
 
 Koha::SearchEngine::Elasticsearch::Search->mk_accessors(qw( store ));
 
@@ -157,15 +158,13 @@ sub search_compat {
     my $results = $self->search($query, undef, $results_per_page, %options);
 
     # Convert each result into a MARC::Record
-    my (@records, $index);
-    $index = $offset; # opac-search expects results to be put in the
-        # right place in the array, according to $offset
+    my @records;
+    # opac-search expects results to be put in the
+    # right place in the array, according to $offset
+    my $index = $offset;
     $results->each(sub {
-            # The results come in an array for some reason
-            my $marc_json = $_[0]->{record};
-            my $marc = $self->json2marc($marc_json);
-            $records[$index++] = $marc;
-        });
+        $records[$index++] = $self->decode_record_from_result(@_);
+    });
     # consumers of this expect a name-spaced result, we provide the default
     # configuration.
     my %result;
@@ -196,14 +195,14 @@ sub search_auth_compat {
     $res->each(
         sub {
             my %result;
-            my $record    = $_[0];
-            my $marc_json = $record->{record};
 
             # I wonder if these should be real values defined in the mapping
             # rather than hard-coded conversions.
+            my $record    = $_[0];
             # Handle legacy nested arrays indexed with splitting enabled.
             my $authid = $record->{ 'Local-number' }[0];
             $authid = @$authid[0] if (ref $authid eq 'ARRAY');
+
             $result{authid} = $authid;
 
             # TODO put all this info into the record at index time so we
@@ -219,7 +218,7 @@ sub search_auth_compat {
             # it's not reproduced here yet.
             my $authtype           = $rs->single;
             my $auth_tag_to_report = $authtype ? $authtype->auth_tag_to_report : "";
-            my $marc               = $self->json2marc($marc_json);
+            my $marc               = $self->decode_record_from_result(@_);
             my $mainentry          = $marc->field($auth_tag_to_report);
             my $reported_tag;
             if ($mainentry) {
@@ -338,9 +337,7 @@ sub simple_search_compat {
     my $results = $self->search($query, undef, $max_results, %options);
     my @records;
     $results->each(sub {
-            # The results come in an array for some reason
-            my $marc_json = $_[0]->{record};
-            my $marc = $self->json2marc($marc_json);
+            my $marc = $self->decode_record_from_result(@_);
             push @records, $marc;
         });
     return (undef, \@records, $results->total);
@@ -361,43 +358,23 @@ sub extract_biblionumber {
     return Koha::SearchEngine::Search::extract_biblionumber( $searchresultrecord );
 }
 
-=head2 json2marc
-
-    my $marc = $self->json2marc($marc_json);
+=head2 decode_record_from_result
+    my $marc_record = $self->decode_record_from_result(@result);
 
-Converts the form of marc (based on its JSON, but as a Perl structure) that
-Catmandu stores into a MARC::Record object.
+Extracts marc data from Elasticsearch result and decodes to MARC::Record object
 
 =cut
 
-sub json2marc {
-    my ( $self, $marcjson ) = @_;
-
-    my $marc = MARC::Record->new();
-    $marc->encoding('UTF-8');
-
-    # fields are like:
-    # [ '245', '1', '2', 'a' => 'Title', 'b' => 'Subtitle' ]
-    # or
-    # [ '001', undef, undef, '_', 'a value' ]
-    # conveniently, this is the form that MARC::Field->new() likes
-    foreach my $field (@$marcjson) {
-        next if @$field < 5;
-        if ( $field->[0] eq 'LDR' ) {
-            $marc->leader( $field->[4] );
-        }
-        else {
-            my $tag = $field->[0];
-            my $marc_field;
-            if ( MARC::Field->is_controlfield_tag( $field->[0] ) ) {
-                $marc_field = MARC::Field->new($field->[0], $field->[4]);
-            } else {
-                $marc_field = MARC::Field->new(@$field);
-            }
-            $marc->append_fields($marc_field);
-        }
+sub decode_record_from_result {
+    # Result is passed in as array, will get flattened
+    # and first element will be $result
+    my ( $self, $result ) = @_;
+    if (C4::Context->preference('ElasticsearchMARCSerializationFormat') eq 'MARCXML') {
+        return MARC::Record->new_from_xml($result->{marc_data}, 'UTF-8', uc C4::Context->preference('marcflavour'));
+    }
+    else {
+        return MARC::Record->new_from_usmarc(decode_base64($result->{marc_data}));
     }
-    return $marc;
 }
 
 =head2 max_result_window
index dba47f5..0d60cd8 100644 (file)
@@ -5,9 +5,11 @@ general:
     type: string
     analyzer: analyser_standard
   properties:
-    record:
+    marc_data:
       store: true
       type: text
+      analyzer: keyword
+      index: false
 # Search fields
 search:
   boolean:
index 464c8c8..95bd765 100644 (file)
@@ -427,3 +427,11 @@ Administration:
               choices:
                 Zebra: Zebra
                 Elasticsearch: Elasticsearch
+        -
+            - "Use"
+            - pref: ElasticsearchMARCSerializationFormat
+              default: MARCXML
+              choices:
+                MARCXML: MARCXML
+                base64ISO2709: base64ISO2709
+            - "as serialization format for MARC records stored in Elasticsearch index. base64ISO2709 is faster and will use less space but have a maximum record length which could cause issues with very large records."
index 63065ca..589fe61 100755 (executable)
@@ -160,8 +160,13 @@ sub do_reindex {
     my ( $next, $index_name ) = @_;
 
     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
+
     if ($delete) {
-        $indexer->drop_index();
+        $indexer->drop_index() if $indexer->index_exists();
+        $indexer->create_index();
+    }
+    elsif (!$indexer->index_exists()) {
+        # Create index if does not exist
         $indexer->create_index();
     }
 
index b91fcba..d31847e 100644 (file)
 
 use Modern::Perl;
 
-use Test::More tests => 3;
+use Test::More tests => 4;
 use Test::Exception;
 
 use t::lib::Mocks;
 
+use Test::MockModule;
+
+use MARC::Record;
+
 use Koha::SearchEngine::Elasticsearch;
 
 subtest '_read_configuration() tests' => sub {
@@ -108,3 +112,237 @@ subtest 'get_elasticsearch_mappings() tests' => sub {
     $mappings = $es->get_elasticsearch_mappings();
     is( $mappings->{data}{_all}{type}, 'string', 'Field mappings parsed correctly' );
 };
+
+subtest 'Koha::SearchEngine::Elasticsearch::marc_records_to_documents () tests' => sub {
+
+    plan tests => 29;
+
+    t::lib::Mocks::mock_preference('marcflavour', 'MARC21');
+
+    my @mappings = (
+        {
+            name => 'author',
+            type => 'string',
+            facet => 1,
+            suggestible => 1,
+            sort => undef,
+            marc_type => 'marc21',
+            marc_field => '100a',
+        },
+        {
+            name => 'author',
+            type => 'string',
+            facet => 1,
+            suggestible => 1,
+            sort => 1,
+            marc_type => 'marc21',
+            marc_field => '110a',
+        },
+        {
+            name => 'title',
+            type => 'string',
+            facet => 0,
+            suggestible => 1,
+            sort => 1,
+            marc_type => 'marc21',
+            marc_field => '245a',
+        },
+        {
+            name => 'unimarc_title',
+            type => 'string',
+            facet => 0,
+            suggestible => 1,
+            sort => 1,
+            marc_type => 'unimarc',
+            marc_field => '245a',
+        },
+        {
+            name => 'title',
+            type => 'string',
+            facet => 0,
+            suggestible => undef,
+            sort => 0,
+            marc_type => 'marc21',
+            marc_field => '220',
+        },
+        {
+            name => 'sum_item_price',
+            type => 'sum',
+            facet => 0,
+            suggestible => 0,
+            sort => 0,
+            marc_type => 'marc21',
+            marc_field => '952g',
+        },
+        {
+            name => 'items_withdrawn_status',
+            type => 'boolean',
+            facet => 0,
+            suggestible => 0,
+            sort => 0,
+            marc_type => 'marc21',
+            marc_field => '9520',
+        },
+        {
+            name => 'type_of_record',
+            type => 'string',
+            facet => 0,
+            suggestible => 0,
+            sort => 0,
+            marc_type => 'marc21',
+            marc_field => 'leader_/6',
+        },
+        {
+            name => 'type_of_record_and_bib_level',
+            type => 'string',
+            facet => 0,
+            suggestible => 0,
+            sort => 0,
+            marc_type => 'marc21',
+            marc_field => 'leader_/6-7',
+        },
+    );
+
+    my $se = Test::MockModule->new('Koha::SearchEngine::Elasticsearch');
+    $se->mock('_foreach_mapping', sub {
+        my ($self, $sub) = @_;
+
+        foreach my $map (@mappings) {
+            $sub->(
+                $map->{name},
+                $map->{type},
+                $map->{facet},
+                $map->{suggestible},
+                $map->{sort},
+                $map->{marc_type},
+                $map->{marc_field}
+            );
+        }
+    });
+
+    my $see = Koha::SearchEngine::Elasticsearch->new({ index => 'biblios' });
+
+    my $marc_record_1 = MARC::Record->new();
+    $marc_record_1->leader('     cam  22      a 4500');
+    $marc_record_1->append_fields(
+        MARC::Field->new('100', '', '', a => 'Author 1'),
+        MARC::Field->new('110', '', '', a => 'Corp Author'),
+        MARC::Field->new('210', '', '', a => 'Title 1'),
+        MARC::Field->new('245', '', '', a => 'Title: first record'),
+        MARC::Field->new('999', '', '', c => '1234567'),
+        # '  ' for testing trimming of white space in boolean value callback:
+        MARC::Field->new('952', '', '', 0 => '  ', g => '123.30'),
+        MARC::Field->new('952', '', '', 0 => 0, g => '127.20'),
+    );
+    my $marc_record_2 = MARC::Record->new();
+    $marc_record_2->leader('     cam  22      a 4500');
+    $marc_record_2->append_fields(
+        MARC::Field->new('100', '', '', a => 'Author 2'),
+        # MARC::Field->new('210', '', '', a => 'Title 2'),
+        # MARC::Field->new('245', '', '', a => 'Title: second record'),
+        MARC::Field->new('999', '', '', c => '1234568'),
+        MARC::Field->new('952', '', '', 0 => 1, g => 'string where should be numeric'),
+    );
+    my $records = [$marc_record_1, $marc_record_2];
+
+    $see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
+
+    my $docs = $see->marc_records_to_documents($records);
+
+    # First record:
+
+    is(scalar @{$docs}, 2, 'Two records converted to documents');
+
+    is($docs->[0][0], '1234567', 'First document biblionumber should be set as first element in document touple');
+
+    is(scalar @{$docs->[0][1]->{author}}, 2, 'First document author field should contain two values');
+    is_deeply($docs->[0][1]->{author}, ['Author 1', 'Corp Author'], 'First document author field should be set correctly');
+
+    is(scalar @{$docs->[0][1]->{author__sort}}, 2, 'First document author__sort field should have two values');
+    is_deeply($docs->[0][1]->{author__sort}, ['Author 1', 'Corp Author'], 'First document author__sort field should be set correctly');
+
+    is(scalar @{$docs->[0][1]->{title__sort}}, 1, 'First document title__sort field should have one value');
+    is_deeply($docs->[0][1]->{title__sort}, ['Title: first record'], 'First document title__sort field should be set correctly');
+
+    is(scalar @{$docs->[0][1]->{author__suggestion}}, 2, 'First document author__suggestion field should contain two values');
+    is_deeply(
+        $docs->[0][1]->{author__suggestion},
+        [
+            {
+                'input' => 'Author 1'
+            },
+            {
+                'input' => 'Corp Author'
+            }
+        ],
+        'First document author__suggestion field should be set correctly'
+    );
+
+    is(scalar @{$docs->[0][1]->{title__suggestion}}, 1, 'First document title__suggestion field should contain one value');
+    is_deeply(
+        $docs->[0][1]->{title__suggestion},
+        [{ 'input' => 'Title: first record' }],
+        'First document title__suggestion field should be set correctly'
+    );
+
+    ok(!(defined $docs->[0][1]->{title__facet}), 'First document should have no title__facet field');
+
+    is(scalar @{$docs->[0][1]->{author__facet}}, 2, 'First document author__facet field should have two values');
+    is_deeply(
+        $docs->[0][1]->{author__facet},
+        ['Author 1', 'Corp Author'],
+        'First document author__facet field should be set correctly'
+    );
+
+    is(scalar @{$docs->[0][1]->{items_withdrawn_status}}, 2, 'First document items_withdrawn_status field should have two values');
+    is_deeply(
+        $docs->[0][1]->{items_withdrawn_status},
+        ['false', 'false'],
+        'First document items_withdrawn_status field should be set correctly'
+    );
+
+    is(
+        $docs->[0][1]->{sum_item_price},
+        '250.5',
+        'First document sum_item_price field should be set correctly'
+    );
+
+    ok(defined $docs->[0][1]->{marc_xml}, 'First document marc_xml field should be set');
+
+    is(scalar @{$docs->[0][1]->{type_of_record}}, 1, 'First document type_of_record field should have one value');
+    is_deeply(
+        $docs->[0][1]->{type_of_record},
+        ['a'],
+        'First document type_of_record field should be set correctly'
+    );
+
+    is(scalar @{$docs->[0][1]->{type_of_record_and_bib_level}}, 1, 'First document type_of_record_and_bib_level field should have one value');
+    is_deeply(
+        $docs->[0][1]->{type_of_record_and_bib_level},
+        ['am'],
+        'First document type_of_record_and_bib_level field should be set correctly'
+    );
+
+    # Second record:
+
+    is(scalar @{$docs->[1][1]->{author}}, 1, 'Second document author field should contain one value');
+    is_deeply($docs->[1][1]->{author}, ['Author 2'], 'Second document author field should be set correctly');
+
+    is(scalar @{$docs->[1][1]->{items_withdrawn_status}}, 1, 'Second document items_withdrawn_status field should have one value');
+    is_deeply(
+        $docs->[1][1]->{items_withdrawn_status},
+        ['true'],
+        'Second document items_withdrawn_status field should be set correctly'
+    );
+
+    is(
+        $docs->[1][1]->{sum_item_price},
+        0,
+        'Second document sum_item_price field should be set correctly'
+    );
+
+    # Mappings marc_type:
+
+    ok(!(defined $docs->[0][1]->{unimarc_title}), "No mapping when marc_type doesn't match marc flavour");
+
+};
diff --git a/t/db_dependent/Koha_Elasticsearch.t b/t/db_dependent/Koha_Elasticsearch.t
deleted file mode 100644 (file)
index 2602619..0000000
+++ /dev/null
@@ -1,164 +0,0 @@
-# Copyright 2015 Catalyst IT
-#
-# This file is part of Koha.
-#
-# Koha is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# Koha is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Koha; if not, see <http://www.gnu.org/licenses>
-
-use Modern::Perl;
-
-use Test::More tests => 2;
-use Test::MockModule;
-
-use t::lib::Mocks;
-use MARC::Record;
-
-my $schema = Koha::Database->schema;
-
-use_ok('Koha::SearchEngine::Elasticsearch');
-
-subtest 'get_fixer_rules() tests' => sub {
-
-    plan tests => 49;
-
-    $schema->storage->txn_begin;
-
-    t::lib::Mocks::mock_preference( 'marcflavour', 'MARC21' );
-
-    my @mappings;
-
-    my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
-    $se->mock( '_foreach_mapping', sub {
-        my ($self, $sub ) = @_;
-
-        foreach my $map ( @mappings ) {
-            $sub->(
-                $map->{name},
-                $map->{type},
-                $map->{facet},
-                $map->{suggestible},
-                $map->{sort},
-                $map->{marc_type},
-                $map->{marc_field}
-            );
-        }
-    });
-
-    my $see = Koha::SearchEngine::Elasticsearch->new({ index => 'biblios' });
-
-    @mappings = (
-        {
-            name => 'author',
-            type => 'string',
-            facet => 1,
-            suggestible => 1,
-            sort => undef,
-            marc_type => 'marc21',
-            marc_field => '100a',
-        },
-        {
-            name => 'author',
-            type => 'string',
-            facet => 1,
-            suggestible => 1,
-            sort => 1,
-            marc_type => 'marc21',
-            marc_field => '110a',
-        },
-    );
-
-    $see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
-    my $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
-    is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
-    is( $result->[3], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
-    is( $result->[4], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
-    is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
-    is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[8], q{move_field(_id,es_id)});
-
-    $mappings[0]->{type}  = 'boolean';
-    $mappings[1]->{type}  = 'boolean';
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
-    is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
-    is( $result->[3], q{unless exists('} . $mappings[0]->{name} . q{') add_field('} . $mappings[0]->{name} . q{', 0) end});
-    is( $result->[4], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
-    is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
-    is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
-    is( $result->[8], q{unless exists('} . $mappings[1]->{name} . q{') add_field('} . $mappings[1]->{name} . q{', 0) end});
-    is( $result->[9], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[10], q{move_field(_id,es_id)});
-
-    $mappings[0]->{type}  = 'sum';
-    $mappings[1]->{type}  = 'sum';
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__facet.$append', )});
-    is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
-    is( $result->[3], q{sum('} . $mappings[0]->{name} . q{')});
-    is( $result->[4], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
-    is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[6], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__facet.$append', )});
-    is( $result->[7], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
-    is( $result->[8], q{sum('} . $mappings[1]->{name} . q{')});
-    is( $result->[9], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[10], q{move_field(_id,es_id)});
-
-    $mappings[0]->{type}  = 'string';
-    $mappings[0]->{facet} = 0;
-    $mappings[1]->{type}  = 'string';
-    $mappings[1]->{facet} = 0;
-
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__suggestion.input.$append')});
-    is( $result->[2], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
-    is( $result->[3], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[4], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__suggestion.input.$append')});
-    is( $result->[5], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[6], q{move_field(_id,es_id)});
-
-    $mappings[0]->{suggestible}  = 0;
-    $mappings[1]->{suggestible}  = 0;
-
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{__sort.$append', )});
-    is( $result->[2], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[3], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[4], q{move_field(_id,es_id)});
-
-    $mappings[0]->{sort}  = 0;
-    $mappings[1]->{sort}  = undef;
-
-    $see->get_elasticsearch_mappings(); #sort_fields will call this and use the actual db values unless we call it first
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{marc_map('} . $mappings[0]->{marc_field} . q{','} . $mappings[0]->{name} . q{.$append', )});
-    is( $result->[1], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{.$append', )});
-    is( $result->[2], q{marc_map('} . $mappings[1]->{marc_field} . q{','} . $mappings[1]->{name} . q{__sort.$append', )});
-    is( $result->[3], q{move_field(_id,es_id)});
-
-    t::lib::Mocks::mock_preference( 'marcflavour', 'UNIMARC' );
-
-    $result = $see->get_fixer_rules();
-    is( $result->[0], q{move_field(_id,es_id)});
-    is( $result->[1], undef, q{No mapping when marc_type doesn't match marchflavour} );
-
-    $schema->storage->txn_rollback;
-
-};
index ac31228..235fbb5 100644 (file)
@@ -17,7 +17,7 @@
 
 use Modern::Perl;
 
-use Test::More tests => 7;
+use Test::More tests => 3;
 use Test::MockModule;
 use t::lib::Mocks;
 
@@ -37,16 +37,11 @@ ok(
 
 my $marc_record = MARC::Record->new();
 $marc_record->append_fields(
-    MARC::Field->new( '001', '1234567' ),
-    MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
-    MARC::Field->new( '245', '', '', 'a' => 'Title' )
+       MARC::Field->new( '001', '1234567' ),
+       MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
+       MARC::Field->new( '245', '', '', 'a' => 'Title' )
 );
-
 my $records = [$marc_record];
-ok( my $converted = $indexer->_convert_marc_to_json($records),
-    'Convert some records' );
-
-is( $converted->count, 1, 'One converted record' );
 
 SKIP: {
 
@@ -55,111 +50,5 @@ SKIP: {
     skip 'Elasticsearch configuration not available', 1
         if $@;
 
-    ok( $indexer->update_index(undef,$records), 'Update Index' );
+    ok( $indexer->update_index(undef, $records), 'Update Index' );
 }
-
-subtest 'create_index() tests' => sub {
-
-    plan tests => 3;
-
-    my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
-    $se->mock( 'get_elasticsearch_params', sub {
-        my ($self, $sub ) = @_;
-
-        my $method = $se->original( 'get_elasticsearch_params' );
-        my $params = $method->( $self );
-        $params->{index_name} .= '__test';
-        return $params;
-    });
-
-    my $indexer;
-    ok(
-        $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new({ 'index' => 'biblios' }),
-        'Creating a new indexer object'
-    );
-    ok(
-        $indexer->create_index(),
-        'Creating an index'
-    );
-    $indexer->drop_index();
-    ok(
-        $indexer->drop_index(),
-        'Dropping the index'
-    );
-};
-
-subtest '_convert_marc_to_json() tests' => sub {
-
-    plan tests => 4;
-
-    $schema->storage->txn_begin;
-
-    t::lib::Mocks::mock_preference( 'marcflavour', 'MARC21' );
-
-    my @mappings = (
-        {
-            name => 'author',
-            type => 'string',
-            facet => 1,
-            suggestible => 1,
-            sort => '~',
-            marc_type => 'marc21',
-            marc_field => '100a',
-        },
-        {
-            name => 'author',
-            type => 'string',
-            facet => 1,
-            suggestible => 1,
-            sort => '~',
-            marc_type => 'marc21',
-            marc_field => '110a',
-        },
-    );
-
-
-    my $se = Test::MockModule->new( 'Koha::SearchEngine::Elasticsearch' );
-    $se->mock( '_foreach_mapping', sub {
-        my ($self, $sub ) = @_;
-
-        foreach my $map ( @mappings ) {
-            $sub->(
-                $map->{name},
-                $map->{type},
-                $map->{facet},
-                $map->{suggestible},
-                $map->{sort},
-                $map->{marc_type},
-                $map->{marc_field}
-            );
-        }
-    });
-
-    my $marc_record = MARC::Record->new();
-    $marc_record->append_fields(
-        MARC::Field->new( '001', '1234567' ),
-        MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
-        MARC::Field->new( '100', '', '', 'a' => 'Author' ),
-        MARC::Field->new( '110', '', '', 'a' => 'Corp Author' ),
-        MARC::Field->new( '245', '', '', 'a' => 'Title' ),
-    );
-    my $marc_record_2 = MARC::Record->new();
-    $marc_record_2->append_fields(
-        MARC::Field->new( '001', '1234567' ),
-        MARC::Field->new( '020', '', '', 'a' => '1234567890123' ),
-        MARC::Field->new( '100', '', '', 'a' => 'Author' ),
-        MARC::Field->new( '245', '', '', 'a' => 'Title' ),
-    );
-    my @records = ( $marc_record, $marc_record_2 );
-
-    my $importer = Koha::SearchEngine::Elasticsearch::Indexer->new({ index => 'biblios' })->_convert_marc_to_json( \@records );
-    my $conv = $importer->next();
-    is( $conv->{author}[0], "Author", "First mapped author should be 100a");
-    is( $conv->{author}[1], "Corp Author", "Second mapped author should be 110a");
-
-    $conv = $importer->next();
-    is( $conv->{author}[0], "Author", "First mapped author should be 100a");
-    is( scalar @{$conv->{author}} , 1, "We should map field only if exists, shouldn't add extra nulls");
-
-    $schema->storage->txn_rollback;
-};