Bug 18235: (QA follow-up) Series facet name after rebase
[koha-equinox.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41 use Business::ISBN;
42
43 __PACKAGE__->mk_ro_accessors(qw( index ));
44 __PACKAGE__->mk_accessors(qw( sort_fields ));
45
46 # Constants to refer to the standard index names
47 Readonly our $BIBLIOS_INDEX     => 'biblios';
48 Readonly our $AUTHORITIES_INDEX => 'authorities';
49
50 =head1 NAME
51
52 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
53
54 =head1 ACCESSORS
55
56 =over 4
57
58 =item index
59
60 The name of the index to use, generally 'biblios' or 'authorities'.
61
62 =back
63
64 =head1 FUNCTIONS
65
66 =cut
67
68 sub new {
69     my $class = shift @_;
70     my $self = $class->SUPER::new(@_);
71     # Check for a valid index
72     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
73     return $self;
74 }
75
76 =head2 get_elasticsearch
77
78     my $elasticsearch_client = $self->get_elasticsearch();
79
80 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
81 instance level and will be reused if method is called multiple times.
82
83 =cut
84
85 sub get_elasticsearch {
86     my $self = shift @_;
87     unless (defined $self->{elasticsearch}) {
88         my $conf = $self->get_elasticsearch_params();
89         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
90     }
91     return $self->{elasticsearch};
92 }
93
94 =head2 get_elasticsearch_params
95
96     my $params = $self->get_elasticsearch_params();
97
98 This provides a hashref that contains the parameters for connecting to the
99 ElasicSearch servers, in the form:
100
101     {
102         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
103         'index_name' => 'koha_instance_index',
104     }
105
106 This is configured by the following in the C<config> block in koha-conf.xml:
107
108     <elasticsearch>
109         <server>127.0.0.1:9200</server>
110         <server>anotherserver:9200</server>
111         <index_name>koha_instance</index_name>
112     </elasticsearch>
113
114 =cut
115
116 sub get_elasticsearch_params {
117     my ($self) = @_;
118
119     # Copy the hash so that we're not modifying the original
120     my $conf = C4::Context->config('elasticsearch');
121     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
122     my $es = { %{ $conf } };
123
124     # Helpfully, the multiple server lines end up in an array for us anyway
125     # if there are multiple ones, but not if there's only one.
126     my $server = $es->{server};
127     delete $es->{server};
128     if ( ref($server) eq 'ARRAY' ) {
129
130         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
131         $es->{nodes} = $server;
132     }
133     elsif ($server) {
134         $es->{nodes} = [$server];
135     }
136     else {
137         die "No elasticsearch servers were specified in koha-conf.xml.\n";
138     }
139     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
140       if ( !$es->{index_name} );
141     # Append the name of this particular index to our namespace
142     $es->{index_name} .= '_' . $self->index;
143
144     $es->{key_prefix} = 'es_';
145     $es->{client} //= '5_0::Direct';
146     $es->{cxn_pool} //= 'Sniff';
147     $es->{request_timeout} //= 60;
148
149     return $es;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         my $mappings = {
196             data => scalar _get_elasticsearch_mapping('general', '')
197         };
198         my $marcflavour = lc C4::Context->preference('marcflavour');
199         $self->_foreach_mapping(
200             sub {
201                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
202                 return if $marc_type ne $marcflavour;
203                 # TODO if this gets any sort of complexity to it, it should
204                 # be broken out into its own function.
205
206                 # TODO be aware of date formats, but this requires pre-parsing
207                 # as ES will simply reject anything with an invalid date.
208                 my $es_type = 'text';
209                 if ($type eq 'boolean') {
210                     $es_type = 'boolean';
211                 } elsif ($type eq 'number' || $type eq 'sum') {
212                     $es_type = 'integer';
213                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
214                     $es_type = 'stdno';
215                 }
216
217                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
218
219                 if ($facet) {
220                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
221                 }
222                 if ($suggestible) {
223                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
224                 }
225                 # Sort is a bit special as it can be true, false, undef.
226                 # We care about "true" or "undef",
227                 # "undef" means to do the default thing, which is make it sortable.
228                 if (!defined $sort || $sort) {
229                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
230                     $sort_fields{$self->index}{$name} = 1;
231                 }
232             }
233         );
234         $all_mappings{$self->index} = $mappings;
235     }
236     $self->sort_fields(\%{$sort_fields{$self->index}});
237
238     return $all_mappings{$self->index};
239 }
240
241 =head2 _get_elasticsearch_mapping
242
243 Get the Elasticsearch mappings for the given purpose and data type.
244
245 $mapping = _get_elasticsearch_mapping('search', 'text');
246
247 =cut
248
249 sub _get_elasticsearch_mapping {
250
251     my ( $purpose, $type ) = @_;
252
253     # Use state to speed up repeated calls
254     state $settings = undef;
255     if (!defined $settings) {
256         my $config_file = C4::Context->config('elasticsearch_field_config');
257         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
258         $settings = LoadFile( $config_file );
259     }
260
261     if (!defined $settings->{$purpose}) {
262         die "Field purpose $purpose not defined in field config";
263     }
264     if ($type eq '') {
265         return $settings->{$purpose};
266     }
267     if (defined $settings->{$purpose}{$type}) {
268         return $settings->{$purpose}{$type};
269     }
270     if (defined $settings->{$purpose}{'default'}) {
271         return $settings->{$purpose}{'default'};
272     }
273     return;
274 }
275
276 sub reset_elasticsearch_mappings {
277     my ( $reset_fields ) = @_;
278     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
279     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
280     my $indexes = LoadFile( $mappings_yaml );
281
282     while ( my ( $index_name, $fields ) = each %$indexes ) {
283         while ( my ( $field_name, $data ) = each %$fields ) {
284             my $field_type = $data->{type};
285             my $field_label = $data->{label};
286             my $mappings = $data->{mappings};
287             my $facet_order = $data->{facet_order};
288             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name });
289             $search_field->update(
290                 {
291                     label       => $field_label,
292                     type        => $field_type,
293                     facet_order => $facet_order
294                 }
295             );
296             for my $mapping ( @$mappings ) {
297                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
298                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
299             }
300         }
301     }
302 }
303
304 # This overrides the accessor provided by Class::Accessor so that if
305 # sort_fields isn't set, then it'll generate it.
306 sub sort_fields {
307     my $self = shift;
308     if (@_) {
309         $self->_sort_fields_accessor(@_);
310         return;
311     }
312     my $val = $self->_sort_fields_accessor();
313     return $val if $val;
314
315     # This will populate the accessor as a side effect
316     $self->get_elasticsearch_mappings();
317     return $self->_sort_fields_accessor();
318 }
319
320 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
321
322     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
323
324 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
325 Since we group all mappings by MARC field targets C<$mappings> will contain
326 all targets for C<$data> and thus we need to fetch the MARC field only once.
327 C<$mappings> will be applied to C<$record_document> and new field values added.
328 The method has no return value.
329
330 =over 4
331
332 =item C<$mappings>
333
334 Arrayref of mappings containing arrayrefs in the format
335 [C<$target>, C<$options>] where C<$target> is the name of the target field and
336 C<$options> is a hashref containing processing directives for this particular
337 mapping.
338
339 =item C<$data>
340
341 The source data from a MARC record field.
342
343 =item C<$record_document>
344
345 Hashref representing the Elasticsearch document on which mappings should be
346 applied.
347
348 =item C<$altscript>
349
350 A boolean value indicating whether an alternate script presentation is being
351 processed.
352
353 =back
354
355 =cut
356
357 sub _process_mappings {
358     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
359     foreach my $mapping (@{$mappings}) {
360         my ($target, $options) = @{$mapping};
361
362         # Don't process sort fields for alternate scripts
363         my $sort = $target =~ /__sort$/;
364         if ($sort && $altscript) {
365             next;
366         }
367
368         # Copy (scalar) data since can have multiple targets
369         # with differing options for (possibly) mutating data
370         # so need a different copy for each
371         my $_data = $data;
372         $record_document->{$target} //= [];
373         if (defined $options->{substr}) {
374             my ($start, $length) = @{$options->{substr}};
375             $_data = length($data) > $start ? substr $data, $start, $length : '';
376         }
377         if (defined $options->{value_callbacks}) {
378             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
379         }
380         if (defined $options->{property}) {
381             $_data = {
382                 $options->{property} => $_data
383             }
384         }
385         push @{$record_document->{$target}}, $_data;
386     }
387 }
388
389 =head2 marc_records_to_documents($marc_records)
390
391     my @record_documents = $self->marc_records_to_documents($marc_records);
392
393 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
394
395 Returns array of hash references, representing Elasticsearch documents,
396 acceptable as body payload in C<Search::Elasticsearch> requests.
397
398 =over 4
399
400 =item C<$marc_documents>
401
402 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
403
404 =back
405
406 =cut
407
408 sub marc_records_to_documents {
409     my ($self, $records) = @_;
410     my $rules = $self->_get_marc_mapping_rules();
411     my $control_fields_rules = $rules->{control_fields};
412     my $data_fields_rules = $rules->{data_fields};
413     my $marcflavour = lc C4::Context->preference('marcflavour');
414
415     my @record_documents;
416
417     foreach my $record (@{$records}) {
418         my $record_document = {};
419         my $mappings = $rules->{leader};
420         if ($mappings) {
421             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
422         }
423         foreach my $field ($record->fields()) {
424             if ($field->is_control_field()) {
425                 my $mappings = $control_fields_rules->{$field->tag()};
426                 if ($mappings) {
427                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
428                 }
429             }
430             else {
431                 my $tag = $field->tag();
432                 # Handle alternate scripts in MARC 21
433                 my $altscript = 0;
434                 if ($marcflavour eq 'marc21' && $tag eq '880') {
435                     my $sub6 = $field->subfield('6');
436                     if ($sub6 =~ /^(...)-\d+/) {
437                         $tag = $1;
438                         $altscript = 1;
439                     }
440                 }
441
442                 my $data_field_rules = $data_fields_rules->{$tag};
443
444                 if ($data_field_rules) {
445                     my $subfields_mappings = $data_field_rules->{subfields};
446                     my $wildcard_mappings = $subfields_mappings->{'*'};
447                     foreach my $subfield ($field->subfields()) {
448                         my ($code, $data) = @{$subfield};
449                         my $mappings = $subfields_mappings->{$code} // [];
450                         if ($wildcard_mappings) {
451                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
452                         }
453                         if (@{$mappings}) {
454                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
455                         }
456                     }
457
458                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
459                     if ($subfields_join_mappings) {
460                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
461                             # Map each subfield to values, remove empty values, join with space
462                             my $data = join(
463                                 ' ',
464                                 grep(
465                                     $_,
466                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
467                                 )
468                             );
469                             if ($data) {
470                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
471                             }
472                         }
473                     }
474                 }
475             }
476         }
477         foreach my $field (keys %{$rules->{defaults}}) {
478             unless (defined $record_document->{$field}) {
479                 $record_document->{$field} = $rules->{defaults}->{$field};
480             }
481         }
482         foreach my $field (@{$rules->{sum}}) {
483             if (defined $record_document->{$field}) {
484                 # TODO: validate numeric? filter?
485                 # TODO: Or should only accept fields without nested values?
486                 # TODO: Quick and dirty, improve if needed
487                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
488             }
489         }
490         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
491         foreach my $field (@{$rules->{isbn}}) {
492             if (defined $record_document->{$field}) {
493                 my @isbns = ();
494                 foreach my $input_isbn (@{$record_document->{$field}}) {
495                     my $isbn = Business::ISBN->new($input_isbn);
496                     if (defined $isbn && $isbn->is_valid) {
497                         my $isbn13 = $isbn->as_isbn13->as_string;
498                         push @isbns, $isbn13;
499                         $isbn13 =~ s/\-//g;
500                         push @isbns, $isbn13;
501
502                         my $isbn10 = $isbn->as_isbn10;
503                         if ($isbn10) {
504                             $isbn10 = $isbn10->as_string;
505                             push @isbns, $isbn10;
506                             $isbn10 =~ s/\-//g;
507                             push @isbns, $isbn10;
508                         }
509                     } else {
510                         push @isbns, $input_isbn;
511                     }
512                 }
513                 $record_document->{$field} = \@isbns;
514             }
515         }
516
517         # Remove duplicate values and collapse sort fields
518         foreach my $field (keys %{$record_document}) {
519             if (ref($record_document->{$field}) eq 'ARRAY') {
520                 @{$record_document->{$field}} = do {
521                     my %seen;
522                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
523                 };
524                 if ($field =~ /__sort$/) {
525                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
526                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
527                 }
528             }
529         }
530
531         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
532         $record->encoding('UTF-8');
533         my @warnings;
534         {
535             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
536             local $SIG{__WARN__} = sub {
537                 push @warnings, $_[0];
538             };
539             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
540         }
541         if (@warnings) {
542             # Suppress warnings if record length exceeded
543             unless (substr($record->leader(), 0, 5) eq '99999') {
544                 foreach my $warning (@warnings) {
545                     carp $warning;
546                 }
547             }
548             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
549             $record_document->{'marc_format'} = 'MARCXML';
550         }
551         else {
552             $record_document->{'marc_format'} = 'base64ISO2709';
553         }
554         my $id = $record->subfield('999', 'c');
555         push @record_documents, [$id, $record_document];
556     }
557     return \@record_documents;
558 }
559
560 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
561
562     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
563
564 Get mappings, an internal data structure later used by
565 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
566 data for a MARC mapping.
567
568 The returned C<$mappings> is not to to be confused with mappings provided by
569 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
570 provided by C<_foreach_mapping> and expands it to this internal data structure.
571 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
572 is then applied to each MARC target (leader, control field data, subfield or
573 joined subfields) and integrated into the mapping rules data structure used in
574 C<marc_records_to_documents> to transform MARC records into Elasticsearch
575 documents.
576
577 =over 4
578
579 =item C<$facet>
580
581 Boolean indicating whether to create a facet field for this mapping.
582
583 =item C<$suggestible>
584
585 Boolean indicating whether to create a suggestion field for this mapping.
586
587 =item C<$sort>
588
589 Boolean indicating whether to create a sort field for this mapping.
590
591 =item C<$target_name>
592
593 Elasticsearch document target field name.
594
595 =item C<$target_type>
596
597 Elasticsearch document target field type.
598
599 =item C<$range>
600
601 An optional range as a string in the format "<START>-<END>" or "<START>",
602 where "<START>" and "<END>" are integers specifying a range that will be used
603 for extracting a substring from MARC data as Elasticsearch field target value.
604
605 The first character position is "1", and the range is inclusive,
606 so "1-3" means the first three characters of MARC data.
607
608 If only "<START>" is provided only one character at position "<START>" will
609 be extracted.
610
611 =back
612
613 =cut
614
615 sub _field_mappings {
616     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
617     my %mapping_defaults = ();
618     my @mappings;
619
620     my $substr_args = undef;
621     if ($range) {
622         # TODO: use value_callback instead?
623         my ($start, $end) = map(int, split /-/, $range, 2);
624         $substr_args = [$start];
625         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
626     }
627     my $default_options = {};
628     if ($substr_args) {
629         $default_options->{substr} = $substr_args;
630     }
631
632     # TODO: Should probably have per type value callback/hook
633     # but hard code for now
634     if ($target_type eq 'boolean') {
635         $default_options->{value_callbacks} //= [];
636         push @{$default_options->{value_callbacks}}, sub {
637             my ($value) = @_;
638             # Trim whitespace at both ends
639             $value =~ s/^\s+|\s+$//g;
640             return $value ? 'true' : 'false';
641         };
642     }
643
644     my $mapping = [$target_name, $default_options];
645     push @mappings, $mapping;
646
647     my @suffixes = ();
648     push @suffixes, 'facet' if $facet;
649     push @suffixes, 'suggestion' if $suggestible;
650     push @suffixes, 'sort' if !defined $sort || $sort;
651
652     foreach my $suffix (@suffixes) {
653         my $mapping = ["${target_name}__$suffix"];
654         # TODO: Hack, fix later in less hideous manner
655         if ($suffix eq 'suggestion') {
656             push @{$mapping}, {%{$default_options}, property => 'input'};
657         }
658         else {
659             push @{$mapping}, $default_options;
660         }
661         push @mappings, $mapping;
662     }
663     return @mappings;
664 };
665
666 =head2 _get_marc_mapping_rules
667
668     my $mapping_rules = $self->_get_marc_mapping_rules()
669
670 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
671
672 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
673 each call to C<MARC::Record>->field) we create an optimized structure of mapping
674 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
675
676 We can then iterate through all MARC fields for each record and apply all relevant
677 rules once per fields instead of retreiving fields multiple times for each mapping rule
678 which is terribly slow.
679
680 =cut
681
682 # TODO: This structure can be used for processing multiple MARC::Records so is currently
683 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
684 # memory cache which it is currently not. The performance gain of caching
685 # would probably be marginal, but to do this could be a further improvement.
686
687 sub _get_marc_mapping_rules {
688     my ($self) = @_;
689     my $marcflavour = lc C4::Context->preference('marcflavour');
690     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
691     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
692     my $rules = {
693         'leader' => [],
694         'control_fields' => {},
695         'data_fields' => {},
696         'sum' => [],
697         'isbn' => [],
698         'defaults' => {}
699     };
700
701     $self->_foreach_mapping(sub {
702         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
703         return if $marc_type ne $marcflavour;
704
705         if ($type eq 'sum') {
706             push @{$rules->{sum}}, $name;
707         }
708         elsif ($type eq 'isbn') {
709             push @{$rules->{isbn}}, $name;
710         }
711         elsif ($type eq 'boolean') {
712             # boolean gets special handling, if value doesn't exist for a field,
713             # it is set to false
714             $rules->{defaults}->{$name} = 'false';
715         }
716
717         if ($marc_field =~ $field_spec_regexp) {
718             my $field_tag = $1;
719
720             my @subfields;
721             my @subfield_groups;
722             # Parse and separate subfields form subfield groups
723             if (defined $2) {
724                 my $subfield_group = '';
725                 my $open_group = 0;
726
727                 foreach my $token (split //, $2) {
728                     if ($token eq "(") {
729                         if ($open_group) {
730                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
731                                 "Unmatched opening parenthesis for $marc_field"
732                             );
733                         }
734                         else {
735                             $open_group = 1;
736                         }
737                     }
738                     elsif ($token eq ")") {
739                         if ($open_group) {
740                             if ($subfield_group) {
741                                 push @subfield_groups, $subfield_group;
742                                 $subfield_group = '';
743                             }
744                             $open_group = 0;
745                         }
746                         else {
747                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
748                                 "Unmatched closing parenthesis for $marc_field"
749                             );
750                         }
751                     }
752                     elsif ($open_group) {
753                         $subfield_group .= $token;
754                     }
755                     else {
756                         push @subfields, $token;
757                     }
758                 }
759             }
760             else {
761                 push @subfields, '*';
762             }
763
764             my $range = defined $3 ? $3 : undef;
765             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
766
767             if ($field_tag < 10) {
768                 $rules->{control_fields}->{$field_tag} //= [];
769                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
770             }
771             else {
772                 $rules->{data_fields}->{$field_tag} //= {};
773                 foreach my $subfield (@subfields) {
774                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
775                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
776                 }
777                 foreach my $subfield_group (@subfield_groups) {
778                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
779                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
780                 }
781             }
782         }
783         elsif ($marc_field =~ $leader_regexp) {
784             my $range = defined $1 ? $1 : undef;
785             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
786             push @{$rules->{leader}}, @mappings;
787         }
788         else {
789             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
790                 "Invalid MARC field expression: $marc_field"
791             );
792         }
793     });
794     return $rules;
795 }
796
797 =head2 _foreach_mapping
798
799     $self->_foreach_mapping(
800         sub {
801             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
802                 $marc_field )
803               = @_;
804             return unless $marc_type eq 'marc21';
805             print "Data comes from: " . $marc_field . "\n";
806         }
807     );
808
809 This allows you to apply a function to each entry in the elasticsearch mappings
810 table, in order to build the mappings for whatever is needed.
811
812 In the provided function, the files are:
813
814 =over 4
815
816 =item C<$name>
817
818 The field name for elasticsearch (corresponds to the 'mapping' column in the
819 database.
820
821 =item C<$type>
822
823 The type for this value, e.g. 'string'.
824
825 =item C<$facet>
826
827 True if this value should be facetised. This only really makes sense if the
828 field is understood by the facet processing code anyway.
829
830 =item C<$sort>
831
832 True if this is a field that a) needs special sort handling, and b) if it
833 should be sorted on. False if a) but not b). Undef if not a). This allows,
834 for example, author to be sorted on but not everything marked with "author"
835 to be included in that sort.
836
837 =item C<$marc_type>
838
839 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
840 'unimarc', 'normarc'.
841
842 =item C<$marc_field>
843
844 A string that describes the MARC field that contains the data to extract.
845 These are of a form suited to Catmandu's MARC fixers.
846
847 =back
848
849 =cut
850
851 sub _foreach_mapping {
852     my ( $self, $sub ) = @_;
853
854     # TODO use a caching framework here
855     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
856         {
857             'search_marc_map.index_name' => $self->index,
858         },
859         {   join => { search_marc_to_fields => 'search_marc_map' },
860             '+select' => [
861                 'search_marc_to_fields.facet',
862                 'search_marc_to_fields.suggestible',
863                 'search_marc_to_fields.sort',
864                 'search_marc_map.marc_type',
865                 'search_marc_map.marc_field',
866             ],
867             '+as'     => [
868                 'facet',
869                 'suggestible',
870                 'sort',
871                 'marc_type',
872                 'marc_field',
873             ],
874         }
875     );
876
877     while ( my $search_field = $search_fields->next ) {
878         $sub->(
879             # Force lower case on indexed field names for case insensitive
880             # field name searches
881             lc($search_field->name),
882             $search_field->type,
883             $search_field->get_column('facet'),
884             $search_field->get_column('suggestible'),
885             $search_field->get_column('sort'),
886             $search_field->get_column('marc_type'),
887             $search_field->get_column('marc_field'),
888         );
889     }
890 }
891
892 =head2 process_error
893
894     die process_error($@);
895
896 This parses an Elasticsearch error message and produces a human-readable
897 result from it. This result is probably missing all the useful information
898 that you might want in diagnosing an issue, so the warning is also logged.
899
900 Note that currently the resulting message is not internationalised. This
901 will happen eventually by some method or other.
902
903 =cut
904
905 sub process_error {
906     my ($self, $msg) = @_;
907
908     warn $msg; # simple logging
909
910     # This is super-primitive
911     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
912
913     return "Unable to perform your search. Please try again.\n";
914 }
915
916 =head2 _read_configuration
917
918     my $conf = _read_configuration();
919
920 Reads the I<configuration file> and returns a hash structure with the
921 configuration information. It raises an exception if mandatory entries
922 are missing.
923
924 The hashref structure has the following form:
925
926     {
927         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
928         'index_name' => 'koha_instance',
929     }
930
931 This is configured by the following in the C<config> block in koha-conf.xml:
932
933     <elasticsearch>
934         <server>127.0.0.1:9200</server>
935         <server>anotherserver:9200</server>
936         <index_name>koha_instance</index_name>
937     </elasticsearch>
938
939 =cut
940
941 sub _read_configuration {
942
943     my $configuration;
944
945     my $conf = C4::Context->config('elasticsearch');
946     Koha::Exceptions::Config::MissingEntry->throw(
947         "Missing 'elasticsearch' block in config file")
948       unless defined $conf;
949
950     if ( $conf && $conf->{server} ) {
951         my $nodes = $conf->{server};
952         if ( ref($nodes) eq 'ARRAY' ) {
953             $configuration->{nodes} = $nodes;
954         }
955         else {
956             $configuration->{nodes} = [$nodes];
957         }
958     }
959     else {
960         Koha::Exceptions::Config::MissingEntry->throw(
961             "Missing 'server' entry in config file for elasticsearch");
962     }
963
964     if ( defined $conf->{index_name} ) {
965         $configuration->{index_name} = $conf->{index_name};
966     }
967     else {
968         Koha::Exceptions::Config::MissingEntry->throw(
969             "Missing 'index_name' entry in config file for elasticsearch");
970     }
971
972     return $configuration;
973 }
974
975 =head2 get_facetable_fields
976
977 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
978
979 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
980
981 =cut
982
983 sub get_facetable_fields {
984     my ($self) = @_;
985
986     # These should correspond to the ES field names, as opposed to the CCL
987     # things that zebra uses.
988     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch );
989     my @faceted_fields = Koha::SearchFields->search(
990         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
991     );
992     my @not_faceted_fields = Koha::SearchFields->search(
993         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
994     );
995     # This could certainly be improved
996     return ( @faceted_fields, @not_faceted_fields );
997 }
998
999 1;
1000
1001 __END__
1002
1003 =head1 AUTHOR
1004
1005 =over 4
1006
1007 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1008
1009 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1010
1011 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1012
1013 =back
1014
1015 =cut