Bug 23204: Export field weights
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
30
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
39
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
46
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
49
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX     => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
53
54 =head1 NAME
55
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
57
58 =head1 ACCESSORS
59
60 =over 4
61
62 =item index
63
64 The name of the index to use, generally 'biblios' or 'authorities'.
65
66 =back
67
68 =head1 FUNCTIONS
69
70 =cut
71
72 sub new {
73     my $class = shift @_;
74     my $self = $class->SUPER::new(@_);
75     # Check for a valid index
76     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77     return $self;
78 }
79
80 =head2 get_elasticsearch
81
82     my $elasticsearch_client = $self->get_elasticsearch();
83
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
86
87 =cut
88
89 sub get_elasticsearch {
90     my $self = shift @_;
91     unless (defined $self->{elasticsearch}) {
92         my $conf = $self->get_elasticsearch_params();
93         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
94     }
95     return $self->{elasticsearch};
96 }
97
98 =head2 get_elasticsearch_params
99
100     my $params = $self->get_elasticsearch_params();
101
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
104
105     {
106         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107         'index_name' => 'koha_instance_index',
108     }
109
110 This is configured by the following in the C<config> block in koha-conf.xml:
111
112     <elasticsearch>
113         <server>127.0.0.1:9200</server>
114         <server>anotherserver:9200</server>
115         <index_name>koha_instance</index_name>
116     </elasticsearch>
117
118 =cut
119
120 sub get_elasticsearch_params {
121     my ($self) = @_;
122
123     # Copy the hash so that we're not modifying the original
124     my $conf = C4::Context->config('elasticsearch');
125     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126     my $es = { %{ $conf } };
127
128     # Helpfully, the multiple server lines end up in an array for us anyway
129     # if there are multiple ones, but not if there's only one.
130     my $server = $es->{server};
131     delete $es->{server};
132     if ( ref($server) eq 'ARRAY' ) {
133
134         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135         $es->{nodes} = $server;
136     }
137     elsif ($server) {
138         $es->{nodes} = [$server];
139     }
140     else {
141         die "No elasticsearch servers were specified in koha-conf.xml.\n";
142     }
143     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144       if ( !$es->{index_name} );
145     # Append the name of this particular index to our namespace
146     $es->{index_name} .= '_' . $self->index;
147
148     $es->{key_prefix} = 'es_';
149     $es->{cxn_pool} //= 'Static';
150     $es->{request_timeout} //= 60;
151
152     return $es;
153 }
154
155 =head2 get_elasticsearch_settings
156
157     my $settings = $self->get_elasticsearch_settings();
158
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
161
162 A hashref containing the settings is returned.
163
164 =cut
165
166 sub get_elasticsearch_settings {
167     my ($self) = @_;
168
169     # Use state to speed up repeated calls
170     state $settings = undef;
171     if (!defined $settings) {
172         my $config_file = C4::Context->config('elasticsearch_index_config');
173         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174         $settings = LoadFile( $config_file );
175     }
176
177     return $settings;
178 }
179
180 =head2 get_elasticsearch_mappings
181
182     my $mappings = $self->get_elasticsearch_mappings();
183
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
186
187 =cut
188
189 sub get_elasticsearch_mappings {
190     my ($self) = @_;
191
192     # Use state to speed up repeated calls
193     state %all_mappings;
194     state %sort_fields;
195
196     if (!defined $all_mappings{$self->index}) {
197         $sort_fields{$self->index} = {};
198         # Clone the general mapping to break ties with the original hash
199         my $mappings = {
200             data => clone(_get_elasticsearch_field_config('general', ''))
201         };
202         my $marcflavour = lc C4::Context->preference('marcflavour');
203         $self->_foreach_mapping(
204             sub {
205                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206                 return if $marc_type ne $marcflavour;
207                 # TODO if this gets any sort of complexity to it, it should
208                 # be broken out into its own function.
209
210                 # TODO be aware of date formats, but this requires pre-parsing
211                 # as ES will simply reject anything with an invalid date.
212                 my $es_type = 'text';
213                 if ($type eq 'boolean') {
214                     $es_type = 'boolean';
215                 } elsif ($type eq 'number' || $type eq 'sum') {
216                     $es_type = 'integer';
217                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218                     $es_type = 'stdno';
219                 }
220
221                 if ($search) {
222                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223                 }
224
225                 if ($facet) {
226                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
227                 }
228                 if ($suggestible) {
229                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
230                 }
231                 # Sort is a bit special as it can be true, false, undef.
232                 # We care about "true" or "undef",
233                 # "undef" means to do the default thing, which is make it sortable.
234                 if (!defined $sort || $sort) {
235                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236                     $sort_fields{$self->index}{$name} = 1;
237                 }
238             }
239         );
240         $all_mappings{$self->index} = $mappings;
241     }
242     $self->sort_fields(\%{$sort_fields{$self->index}});
243
244     return $all_mappings{$self->index};
245 }
246
247 =head2 raw_elasticsearch_mappings
248
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc|normarc
251
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
253
254 =cut
255
256 sub raw_elasticsearch_mappings {
257     my ( $marc_type ) = @_;
258
259     my $schema = Koha::Database->new()->schema();
260
261     my $search_fields = Koha::SearchFields->search();
262
263     my $mappings = {};
264     while ( my $search_field = $search_fields->next ) {
265
266         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search( { search_field_id => $search_field->id } );
267
268         while ( my $marc_to_field = $marc_to_fields->next ) {
269             my $marc_map = Koha::SearchMarcMaps->find( $marc_to_field->search_marc_map_id );
270
271             next if $marc_type && $marc_map->marc_type ne $marc_type;
272
273             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
274             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight || undef;
277
278             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
279                 {
280                     facet   => $marc_to_field->facet || '',
281                     marc_type => $marc_map->marc_type,
282                     marc_field => $marc_map->marc_field,
283                     sort        => $marc_to_field->sort,
284                     suggestible => $marc_to_field->suggestible || ''
285                 });
286
287         }
288     }
289
290     return $mappings;
291 }
292
293 =head2 _get_elasticsearch_field_config
294
295 Get the Elasticsearch field config for the given purpose and data type.
296
297 $mapping = _get_elasticsearch_field_config('search', 'text');
298
299 =cut
300
301 sub _get_elasticsearch_field_config {
302
303     my ( $purpose, $type ) = @_;
304
305     # Use state to speed up repeated calls
306     state $settings = undef;
307     if (!defined $settings) {
308         my $config_file = C4::Context->config('elasticsearch_field_config');
309         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
310         $settings = LoadFile( $config_file );
311     }
312
313     if (!defined $settings->{$purpose}) {
314         die "Field purpose $purpose not defined in field config";
315     }
316     if ($type eq '') {
317         return $settings->{$purpose};
318     }
319     if (defined $settings->{$purpose}{$type}) {
320         return $settings->{$purpose}{$type};
321     }
322     if (defined $settings->{$purpose}{'default'}) {
323         return $settings->{$purpose}{'default'};
324     }
325     return;
326 }
327
328 =head2 _load_elasticsearch_mappings
329
330 Load Elasticsearch mappings in the format of mappings.yaml.
331
332 $indexes = _load_elasticsearch_mappings();
333
334 =cut
335
336 sub _load_elasticsearch_mappings {
337     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
338     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
339     return LoadFile( $mappings_yaml );
340 }
341
342 sub reset_elasticsearch_mappings {
343     my ( $self ) = @_;
344     my $indexes = $self->_load_elasticsearch_mappings();
345
346     Koha::SearchMarcMaps->delete;
347     Koha::SearchFields->delete;
348
349     while ( my ( $index_name, $fields ) = each %$indexes ) {
350         while ( my ( $field_name, $data ) = each %$fields ) {
351
352             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
353
354             # Set default values
355             $sf_params{staff_client} //= 1;
356             $sf_params{opac} //= 1;
357
358             $sf_params{name} = $field_name;
359
360             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
361
362             my $mappings = $data->{mappings};
363             for my $mapping ( @$mappings ) {
364                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
365                     index_name => $index_name,
366                     marc_type => $mapping->{marc_type},
367                     marc_field => $mapping->{marc_field}
368                 });
369                 $search_field->add_to_search_marc_maps($marc_field, {
370                     facet => $mapping->{facet} || 0,
371                     suggestible => $mapping->{suggestible} || 0,
372                     sort => $mapping->{sort},
373                     search => $mapping->{search} // 1
374                 });
375             }
376         }
377     }
378 }
379
380 # This overrides the accessor provided by Class::Accessor so that if
381 # sort_fields isn't set, then it'll generate it.
382 sub sort_fields {
383     my $self = shift;
384     if (@_) {
385         $self->_sort_fields_accessor(@_);
386         return;
387     }
388     my $val = $self->_sort_fields_accessor();
389     return $val if $val;
390
391     # This will populate the accessor as a side effect
392     $self->get_elasticsearch_mappings();
393     return $self->_sort_fields_accessor();
394 }
395
396 =head2 _process_mappings($mappings, $data, $record_document, $meta)
397
398     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
399
400 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
401 Since we group all mappings by MARC field targets C<$mappings> will contain
402 all targets for C<$data> and thus we need to fetch the MARC field only once.
403 C<$mappings> will be applied to C<$record_document> and new field values added.
404 The method has no return value.
405
406 =over 4
407
408 =item C<$mappings>
409
410 Arrayref of mappings containing arrayrefs in the format
411 [C<$target>, C<$options>] where C<$target> is the name of the target field and
412 C<$options> is a hashref containing processing directives for this particular
413 mapping.
414
415 =item C<$data>
416
417 The source data from a MARC record field.
418
419 =item C<$record_document>
420
421 Hashref representing the Elasticsearch document on which mappings should be
422 applied.
423
424 =item C<$meta>
425
426 A hashref containing metadata useful for enforcing per mapping rules. For
427 example for providing extra context for mapping options, or treating mapping
428 targets differently depending on type (sort, search, facet etc). Combining
429 this metadata with the mapping options and metadata allows us to mutate the
430 data per mapping, or even replace it with other data retrieved from the
431 metadata context.
432
433 Current properties are:
434
435 C<altscript>: A boolean value indicating whether an alternate script presentation is being
436 processed.
437
438 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
439 'subfield' or 'subfields_group'.
440
441 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
442
443 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
444 is 'subfields_group'.
445
446 C<field>: The original C<MARC::Record> object.
447
448 =back
449
450 =cut
451
452 sub _process_mappings {
453     my ($_self, $mappings, $data, $record_document, $meta) = @_;
454     foreach my $mapping (@{$mappings}) {
455         my ($target, $options) = @{$mapping};
456
457         # Don't process sort fields for alternate scripts
458         my $sort = $target =~ /__sort$/;
459         if ($sort && $meta->{altscript}) {
460             next;
461         }
462
463         # Copy (scalar) data since can have multiple targets
464         # with differing options for (possibly) mutating data
465         # so need a different copy for each
466         my $_data = $data;
467         $record_document->{$target} //= [];
468         if (defined $options->{substr}) {
469             my ($start, $length) = @{$options->{substr}};
470             $_data = length($data) > $start ? substr $data, $start, $length : '';
471         }
472         if (defined $options->{value_callbacks}) {
473             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
474         }
475         if (defined $options->{property}) {
476             $_data = {
477                 $options->{property} => $_data
478             }
479         }
480         if (defined $options->{nonfiling_characters_indicator}) {
481             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
482             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
483             if ($nonfiling_chars) {
484                 $_data = substr $_data, $nonfiling_chars;
485             }
486         }
487         push @{$record_document->{$target}}, $_data;
488     }
489 }
490
491 =head2 marc_records_to_documents($marc_records)
492
493     my $record_documents = $self->marc_records_to_documents($marc_records);
494
495 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
496
497 Returns array of hash references, representing Elasticsearch documents,
498 acceptable as body payload in C<Search::Elasticsearch> requests.
499
500 =over 4
501
502 =item C<$marc_documents>
503
504 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
505
506 =back
507
508 =cut
509
510 sub marc_records_to_documents {
511     my ($self, $records) = @_;
512     my $rules = $self->_get_marc_mapping_rules();
513     my $control_fields_rules = $rules->{control_fields};
514     my $data_fields_rules = $rules->{data_fields};
515     my $marcflavour = lc C4::Context->preference('marcflavour');
516     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
517
518     my @record_documents;
519
520     foreach my $record (@{$records}) {
521         my $record_document = {};
522         my $mappings = $rules->{leader};
523         if ($mappings) {
524             $self->_process_mappings($mappings, $record->leader(), $record_document, {
525                     altscript => 0,
526                     data_source => 'leader'
527                 }
528             );
529         }
530         foreach my $field ($record->fields()) {
531             if ($field->is_control_field()) {
532                 my $mappings = $control_fields_rules->{$field->tag()};
533                 if ($mappings) {
534                     $self->_process_mappings($mappings, $field->data(), $record_document, {
535                             altscript => 0,
536                             data_source => 'control_field',
537                             field => $field
538                         }
539                     );
540                 }
541             }
542             else {
543                 my $tag = $field->tag();
544                 # Handle alternate scripts in MARC 21
545                 my $altscript = 0;
546                 if ($marcflavour eq 'marc21' && $tag eq '880') {
547                     my $sub6 = $field->subfield('6');
548                     if ($sub6 =~ /^(...)-\d+/) {
549                         $tag = $1;
550                         $altscript = 1;
551                     }
552                 }
553
554                 my $data_field_rules = $data_fields_rules->{$tag};
555                 if ($data_field_rules) {
556                     my $subfields_mappings = $data_field_rules->{subfields};
557                     my $wildcard_mappings = $subfields_mappings->{'*'};
558                     foreach my $subfield ($field->subfields()) {
559                         my ($code, $data) = @{$subfield};
560                         my $mappings = $subfields_mappings->{$code} // [];
561                         if ($wildcard_mappings) {
562                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
563                         }
564                         if (@{$mappings}) {
565                             $self->_process_mappings($mappings, $data, $record_document, {
566                                     altscript => $altscript,
567                                     data_source => 'subfield',
568                                     code => $code,
569                                     field => $field
570                                 }
571                             );
572                         }
573                         if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
574                             # Used by the authority linker the match-heading field requires a specific syntax
575                             # that is specified in C4/Heading
576                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
577                             next unless $heading;
578                             push @{$record_document->{'match-heading'}}, $heading->search_form;
579                         }
580                     }
581
582                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
583                     if ($subfields_join_mappings) {
584                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
585                             # Map each subfield to values, remove empty values, join with space
586                             my $data = join(
587                                 ' ',
588                                 grep(
589                                     $_,
590                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
591                                 )
592                             );
593                             if ($data) {
594                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
595                                         altscript => $altscript,
596                                         data_source => 'subfields_group',
597                                         codes => $subfields_group,
598                                         field => $field
599                                     }
600                                 );
601                             }
602                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
603                                 # Used by the authority linker the match-heading field requires a specific syntax
604                                 # that is specified in C4/Heading
605                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
606                                 next unless $heading;
607                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
608                             }
609                         }
610                     }
611                 }
612             }
613         }
614         foreach my $field (keys %{$rules->{defaults}}) {
615             unless (defined $record_document->{$field}) {
616                 $record_document->{$field} = $rules->{defaults}->{$field};
617             }
618         }
619         foreach my $field (@{$rules->{sum}}) {
620             if (defined $record_document->{$field}) {
621                 # TODO: validate numeric? filter?
622                 # TODO: Or should only accept fields without nested values?
623                 # TODO: Quick and dirty, improve if needed
624                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
625             }
626         }
627         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
628         foreach my $field (@{$rules->{isbn}}) {
629             if (defined $record_document->{$field}) {
630                 my @isbns = ();
631                 foreach my $input_isbn (@{$record_document->{$field}}) {
632                     my $isbn = Business::ISBN->new($input_isbn);
633                     if (defined $isbn && $isbn->is_valid) {
634                         my $isbn13 = $isbn->as_isbn13->as_string;
635                         push @isbns, $isbn13;
636                         $isbn13 =~ s/\-//g;
637                         push @isbns, $isbn13;
638
639                         my $isbn10 = $isbn->as_isbn10;
640                         if ($isbn10) {
641                             $isbn10 = $isbn10->as_string;
642                             push @isbns, $isbn10;
643                             $isbn10 =~ s/\-//g;
644                             push @isbns, $isbn10;
645                         }
646                     } else {
647                         push @isbns, $input_isbn;
648                     }
649                 }
650                 $record_document->{$field} = \@isbns;
651             }
652         }
653
654         # Remove duplicate values and collapse sort fields
655         foreach my $field (keys %{$record_document}) {
656             if (ref($record_document->{$field}) eq 'ARRAY') {
657                 @{$record_document->{$field}} = do {
658                     my %seen;
659                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
660                 };
661                 if ($field =~ /__sort$/) {
662                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
663                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
664                 }
665             }
666         }
667
668         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
669         $record->encoding('UTF-8');
670         if ($use_array) {
671             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
672             $record_document->{'marc_format'} = 'ARRAY';
673         } else {
674             my @warnings;
675             {
676                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
677                 local $SIG{__WARN__} = sub {
678                     push @warnings, $_[0];
679                 };
680                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
681             }
682             if (@warnings) {
683                 # Suppress warnings if record length exceeded
684                 unless (substr($record->leader(), 0, 5) eq '99999') {
685                     foreach my $warning (@warnings) {
686                         carp $warning;
687                     }
688                 }
689                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
690                 $record_document->{'marc_format'} = 'MARCXML';
691             }
692             else {
693                 $record_document->{'marc_format'} = 'base64ISO2709';
694             }
695         }
696         push @record_documents, $record_document;
697     }
698     return \@record_documents;
699 }
700
701 =head2 _marc_to_array($record)
702
703     my @fields = _marc_to_array($record)
704
705 Convert a MARC::Record to an array modeled after MARC-in-JSON
706 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
707
708 =over 4
709
710 =item C<$record>
711
712 A MARC::Record object
713
714 =back
715
716 =cut
717
718 sub _marc_to_array {
719     my ($self, $record) = @_;
720
721     my $data = {
722         leader => $record->leader(),
723         fields => []
724     };
725     for my $field ($record->fields()) {
726         my $tag = $field->tag();
727         if ($field->is_control_field()) {
728             push @{$data->{fields}}, {$tag => $field->data()};
729         } else {
730             my $subfields = ();
731             foreach my $subfield ($field->subfields()) {
732                 my ($code, $contents) = @{$subfield};
733                 push @{$subfields}, {$code => $contents};
734             }
735             push @{$data->{fields}}, {
736                 $tag => {
737                     ind1 => $field->indicator(1),
738                     ind2 => $field->indicator(2),
739                     subfields => $subfields
740                 }
741             };
742         }
743     }
744     return $data;
745 }
746
747 =head2 _array_to_marc($data)
748
749     my $record = _array_to_marc($data)
750
751 Convert an array modeled after MARC-in-JSON to a MARC::Record
752
753 =over 4
754
755 =item C<$data>
756
757 An array modeled after MARC-in-JSON
758 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
759
760 =back
761
762 =cut
763
764 sub _array_to_marc {
765     my ($self, $data) = @_;
766
767     my $record = MARC::Record->new();
768
769     $record->leader($data->{leader});
770     for my $field (@{$data->{fields}}) {
771         my $tag = (keys %{$field})[0];
772         $field = $field->{$tag};
773         my $marc_field;
774         if (ref($field) eq 'HASH') {
775             my @subfields;
776             foreach my $subfield (@{$field->{subfields}}) {
777                 my $code = (keys %{$subfield})[0];
778                 push @subfields, $code;
779                 push @subfields, $subfield->{$code};
780             }
781             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
782         } else {
783             $marc_field = MARC::Field->new($tag, $field)
784         }
785         $record->append_fields($marc_field);
786     }
787 ;
788     return $record;
789 }
790
791 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
792
793     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
794
795 Get mappings, an internal data structure later used by
796 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
797 data for a MARC mapping.
798
799 The returned C<$mappings> is not to to be confused with mappings provided by
800 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
801 provided by C<_foreach_mapping> and expands it to this internal data structure.
802 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
803 is then applied to each MARC target (leader, control field data, subfield or
804 joined subfields) and integrated into the mapping rules data structure used in
805 C<marc_records_to_documents> to transform MARC records into Elasticsearch
806 documents.
807
808 =over 4
809
810 =item C<$facet>
811
812 Boolean indicating whether to create a facet field for this mapping.
813
814 =item C<$suggestible>
815
816 Boolean indicating whether to create a suggestion field for this mapping.
817
818 =item C<$sort>
819
820 Boolean indicating whether to create a sort field for this mapping.
821
822 =item C<$search>
823
824 Boolean indicating whether to create a search field for this mapping.
825
826 =item C<$target_name>
827
828 Elasticsearch document target field name.
829
830 =item C<$target_type>
831
832 Elasticsearch document target field type.
833
834 =item C<$range>
835
836 An optional range as a string in the format "<START>-<END>" or "<START>",
837 where "<START>" and "<END>" are integers specifying a range that will be used
838 for extracting a substring from MARC data as Elasticsearch field target value.
839
840 The first character position is "0", and the range is inclusive,
841 so "0-2" means the first three characters of MARC data.
842
843 If only "<START>" is provided only one character at position "<START>" will
844 be extracted.
845
846 =back
847
848 =cut
849
850 sub _field_mappings {
851     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
852     my %mapping_defaults = ();
853     my @mappings;
854
855     my $substr_args = undef;
856     if (defined $range) {
857         # TODO: use value_callback instead?
858         my ($start, $end) = map(int, split /-/, $range, 2);
859         $substr_args = [$start];
860         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
861     }
862     my $default_options = {};
863     if ($substr_args) {
864         $default_options->{substr} = $substr_args;
865     }
866
867     # TODO: Should probably have per type value callback/hook
868     # but hard code for now
869     if ($target_type eq 'boolean') {
870         $default_options->{value_callbacks} //= [];
871         push @{$default_options->{value_callbacks}}, sub {
872             my ($value) = @_;
873             # Trim whitespace at both ends
874             $value =~ s/^\s+|\s+$//g;
875             return $value ? 'true' : 'false';
876         };
877     }
878
879     if ($search) {
880         my $mapping = [$target_name, $default_options];
881         push @mappings, $mapping;
882     }
883
884     my @suffixes = ();
885     push @suffixes, 'facet' if $facet;
886     push @suffixes, 'suggestion' if $suggestible;
887     push @suffixes, 'sort' if !defined $sort || $sort;
888
889     foreach my $suffix (@suffixes) {
890         my $mapping = ["${target_name}__$suffix"];
891         # TODO: Hack, fix later in less hideous manner
892         if ($suffix eq 'suggestion') {
893             push @{$mapping}, {%{$default_options}, property => 'input'};
894         }
895         else {
896             # Important! Make shallow clone, or we end up with the same hashref
897             # shared by all mappings
898             push @{$mapping}, {%{$default_options}};
899         }
900         push @mappings, $mapping;
901     }
902     return @mappings;
903 };
904
905 =head2 _get_marc_mapping_rules
906
907     my $mapping_rules = $self->_get_marc_mapping_rules()
908
909 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
910
911 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
912 each call to C<MARC::Record>->field) we create an optimized structure of mapping
913 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
914
915 We can then iterate through all MARC fields for each record and apply all relevant
916 rules once per fields instead of retreiving fields multiple times for each mapping rule
917 which is terribly slow.
918
919 =cut
920
921 # TODO: This structure can be used for processing multiple MARC::Records so is currently
922 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
923 # memory cache which it is currently not. The performance gain of caching
924 # would probably be marginal, but to do this could be a further improvement.
925
926 sub _get_marc_mapping_rules {
927     my ($self) = @_;
928     my $marcflavour = lc C4::Context->preference('marcflavour');
929     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
930     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
931     my $rules = {
932         'leader' => [],
933         'control_fields' => {},
934         'data_fields' => {},
935         'sum' => [],
936         'isbn' => [],
937         'defaults' => {}
938     };
939
940     $self->_foreach_mapping(sub {
941         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
942         return if $marc_type ne $marcflavour;
943
944         if ($type eq 'sum') {
945             push @{$rules->{sum}}, $name;
946             push @{$rules->{sum}}, $name."__sort" if $sort;
947         }
948         elsif ($type eq 'isbn') {
949             push @{$rules->{isbn}}, $name;
950         }
951         elsif ($type eq 'boolean') {
952             # boolean gets special handling, if value doesn't exist for a field,
953             # it is set to false
954             $rules->{defaults}->{$name} = 'false';
955         }
956
957         if ($marc_field =~ $field_spec_regexp) {
958             my $field_tag = $1;
959
960             my @subfields;
961             my @subfield_groups;
962             # Parse and separate subfields form subfield groups
963             if (defined $2) {
964                 my $subfield_group = '';
965                 my $open_group = 0;
966
967                 foreach my $token (split //, $2) {
968                     if ($token eq "(") {
969                         if ($open_group) {
970                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
971                                 "Unmatched opening parenthesis for $marc_field"
972                             );
973                         }
974                         else {
975                             $open_group = 1;
976                         }
977                     }
978                     elsif ($token eq ")") {
979                         if ($open_group) {
980                             if ($subfield_group) {
981                                 push @subfield_groups, $subfield_group;
982                                 $subfield_group = '';
983                             }
984                             $open_group = 0;
985                         }
986                         else {
987                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
988                                 "Unmatched closing parenthesis for $marc_field"
989                             );
990                         }
991                     }
992                     elsif ($open_group) {
993                         $subfield_group .= $token;
994                     }
995                     else {
996                         push @subfields, $token;
997                     }
998                 }
999             }
1000             else {
1001                 push @subfields, '*';
1002             }
1003
1004             my $range = defined $3 ? $3 : undef;
1005             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1006             if ($field_tag < 10) {
1007                 $rules->{control_fields}->{$field_tag} //= [];
1008                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1009             }
1010             else {
1011                 $rules->{data_fields}->{$field_tag} //= {};
1012                 foreach my $subfield (@subfields) {
1013                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1014                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1015                 }
1016                 foreach my $subfield_group (@subfield_groups) {
1017                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1018                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1019                 }
1020             }
1021         }
1022         elsif ($marc_field =~ $leader_regexp) {
1023             my $range = defined $1 ? $1 : undef;
1024             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1025             push @{$rules->{leader}}, @mappings;
1026         }
1027         else {
1028             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1029                 "Invalid MARC field expression: $marc_field"
1030             );
1031         }
1032     });
1033
1034     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1035     if ($marcflavour eq 'marc21') {
1036         # Nonfiling characters processing for sort fields
1037         my %title_fields;
1038         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1039             # Format is: nonfiling characters indicator => field names list
1040             %title_fields = (
1041                 1 => [130, 630, 730, 740],
1042                 2 => [222, 240, 242, 243, 245, 440, 830]
1043             );
1044         }
1045         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1046             %title_fields = (
1047                 1 => [730],
1048                 2 => [130, 430, 530]
1049             );
1050         }
1051         foreach my $indicator (keys %title_fields) {
1052             foreach my $field_tag (@{$title_fields{$indicator}}) {
1053                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1054                 foreach my $mapping (@{$mappings}) {
1055                     if ($mapping->[0] =~ /__sort$/) {
1056                         # Mark this as to be processed for nonfiling characters indicator
1057                         # later on in _process_mappings
1058                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1059                     }
1060                 }
1061             }
1062         }
1063     }
1064
1065     return $rules;
1066 }
1067
1068 =head2 _foreach_mapping
1069
1070     $self->_foreach_mapping(
1071         sub {
1072             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1073                 $marc_field )
1074               = @_;
1075             return unless $marc_type eq 'marc21';
1076             print "Data comes from: " . $marc_field . "\n";
1077         }
1078     );
1079
1080 This allows you to apply a function to each entry in the elasticsearch mappings
1081 table, in order to build the mappings for whatever is needed.
1082
1083 In the provided function, the files are:
1084
1085 =over 4
1086
1087 =item C<$name>
1088
1089 The field name for elasticsearch (corresponds to the 'mapping' column in the
1090 database.
1091
1092 =item C<$type>
1093
1094 The type for this value, e.g. 'string'.
1095
1096 =item C<$facet>
1097
1098 True if this value should be facetised. This only really makes sense if the
1099 field is understood by the facet processing code anyway.
1100
1101 =item C<$sort>
1102
1103 True if this is a field that a) needs special sort handling, and b) if it
1104 should be sorted on. False if a) but not b). Undef if not a). This allows,
1105 for example, author to be sorted on but not everything marked with "author"
1106 to be included in that sort.
1107
1108 =item C<$marc_type>
1109
1110 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1111 'unimarc', 'normarc'.
1112
1113 =item C<$marc_field>
1114
1115 A string that describes the MARC field that contains the data to extract.
1116 These are of a form suited to Catmandu's MARC fixers.
1117
1118 =back
1119
1120 =cut
1121
1122 sub _foreach_mapping {
1123     my ( $self, $sub ) = @_;
1124
1125     # TODO use a caching framework here
1126     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1127         {
1128             'search_marc_map.index_name' => $self->index,
1129         },
1130         {   join => { search_marc_to_fields => 'search_marc_map' },
1131             '+select' => [
1132                 'search_marc_to_fields.facet',
1133                 'search_marc_to_fields.suggestible',
1134                 'search_marc_to_fields.sort',
1135                 'search_marc_to_fields.search',
1136                 'search_marc_map.marc_type',
1137                 'search_marc_map.marc_field',
1138             ],
1139             '+as'     => [
1140                 'facet',
1141                 'suggestible',
1142                 'sort',
1143                 'search',
1144                 'marc_type',
1145                 'marc_field',
1146             ],
1147         }
1148     );
1149
1150     while ( my $search_field = $search_fields->next ) {
1151         $sub->(
1152             # Force lower case on indexed field names for case insensitive
1153             # field name searches
1154             lc($search_field->name),
1155             $search_field->type,
1156             $search_field->get_column('facet'),
1157             $search_field->get_column('suggestible'),
1158             $search_field->get_column('sort'),
1159             $search_field->get_column('search'),
1160             $search_field->get_column('marc_type'),
1161             $search_field->get_column('marc_field'),
1162         );
1163     }
1164 }
1165
1166 =head2 process_error
1167
1168     die process_error($@);
1169
1170 This parses an Elasticsearch error message and produces a human-readable
1171 result from it. This result is probably missing all the useful information
1172 that you might want in diagnosing an issue, so the warning is also logged.
1173
1174 Note that currently the resulting message is not internationalised. This
1175 will happen eventually by some method or other.
1176
1177 =cut
1178
1179 sub process_error {
1180     my ($self, $msg) = @_;
1181
1182     warn $msg; # simple logging
1183
1184     # This is super-primitive
1185     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1186
1187     return "Unable to perform your search. Please try again.\n";
1188 }
1189
1190 =head2 _read_configuration
1191
1192     my $conf = _read_configuration();
1193
1194 Reads the I<configuration file> and returns a hash structure with the
1195 configuration information. It raises an exception if mandatory entries
1196 are missing.
1197
1198 The hashref structure has the following form:
1199
1200     {
1201         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1202         'index_name' => 'koha_instance',
1203     }
1204
1205 This is configured by the following in the C<config> block in koha-conf.xml:
1206
1207     <elasticsearch>
1208         <server>127.0.0.1:9200</server>
1209         <server>anotherserver:9200</server>
1210         <index_name>koha_instance</index_name>
1211     </elasticsearch>
1212
1213 =cut
1214
1215 sub _read_configuration {
1216
1217     my $configuration;
1218
1219     my $conf = C4::Context->config('elasticsearch');
1220     Koha::Exceptions::Config::MissingEntry->throw(
1221         "Missing 'elasticsearch' block in config file")
1222       unless defined $conf;
1223
1224     if ( $conf && $conf->{server} ) {
1225         my $nodes = $conf->{server};
1226         if ( ref($nodes) eq 'ARRAY' ) {
1227             $configuration->{nodes} = $nodes;
1228         }
1229         else {
1230             $configuration->{nodes} = [$nodes];
1231         }
1232     }
1233     else {
1234         Koha::Exceptions::Config::MissingEntry->throw(
1235             "Missing 'server' entry in config file for elasticsearch");
1236     }
1237
1238     if ( defined $conf->{index_name} ) {
1239         $configuration->{index_name} = $conf->{index_name};
1240     }
1241     else {
1242         Koha::Exceptions::Config::MissingEntry->throw(
1243             "Missing 'index_name' entry in config file for elasticsearch");
1244     }
1245
1246     return $configuration;
1247 }
1248
1249 =head2 get_facetable_fields
1250
1251 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1252
1253 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1254
1255 =cut
1256
1257 sub get_facetable_fields {
1258     my ($self) = @_;
1259
1260     # These should correspond to the ES field names, as opposed to the CCL
1261     # things that zebra uses.
1262     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1263     my @faceted_fields = Koha::SearchFields->search(
1264         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1265     );
1266     my @not_faceted_fields = Koha::SearchFields->search(
1267         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1268     );
1269     # This could certainly be improved
1270     return ( @faceted_fields, @not_faceted_fields );
1271 }
1272
1273 1;
1274
1275 __END__
1276
1277 =head1 AUTHOR
1278
1279 =over 4
1280
1281 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1282
1283 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1284
1285 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1286
1287 =back
1288
1289 =cut