Bug 22339: Fix search field mappings of MARC fixed fields
[koha-equinox.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use Clone qw(clone);
31 use JSON;
32 use Modern::Perl;
33 use Readonly;
34 use Search::Elasticsearch;
35 use Try::Tiny;
36 use YAML::Syck;
37
38 use List::Util qw( sum0 reduce );
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42 use Business::ISBN;
43
44 __PACKAGE__->mk_ro_accessors(qw( index ));
45 __PACKAGE__->mk_accessors(qw( sort_fields ));
46
47 # Constants to refer to the standard index names
48 Readonly our $BIBLIOS_INDEX     => 'biblios';
49 Readonly our $AUTHORITIES_INDEX => 'authorities';
50
51 =head1 NAME
52
53 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
54
55 =head1 ACCESSORS
56
57 =over 4
58
59 =item index
60
61 The name of the index to use, generally 'biblios' or 'authorities'.
62
63 =back
64
65 =head1 FUNCTIONS
66
67 =cut
68
69 sub new {
70     my $class = shift @_;
71     my $self = $class->SUPER::new(@_);
72     # Check for a valid index
73     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
74     return $self;
75 }
76
77 =head2 get_elasticsearch
78
79     my $elasticsearch_client = $self->get_elasticsearch();
80
81 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
82 instance level and will be reused if method is called multiple times.
83
84 =cut
85
86 sub get_elasticsearch {
87     my $self = shift @_;
88     unless (defined $self->{elasticsearch}) {
89         my $conf = $self->get_elasticsearch_params();
90         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
91     }
92     return $self->{elasticsearch};
93 }
94
95 =head2 get_elasticsearch_params
96
97     my $params = $self->get_elasticsearch_params();
98
99 This provides a hashref that contains the parameters for connecting to the
100 ElasicSearch servers, in the form:
101
102     {
103         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
104         'index_name' => 'koha_instance_index',
105     }
106
107 This is configured by the following in the C<config> block in koha-conf.xml:
108
109     <elasticsearch>
110         <server>127.0.0.1:9200</server>
111         <server>anotherserver:9200</server>
112         <index_name>koha_instance</index_name>
113     </elasticsearch>
114
115 =cut
116
117 sub get_elasticsearch_params {
118     my ($self) = @_;
119
120     # Copy the hash so that we're not modifying the original
121     my $conf = C4::Context->config('elasticsearch');
122     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
123     my $es = { %{ $conf } };
124
125     # Helpfully, the multiple server lines end up in an array for us anyway
126     # if there are multiple ones, but not if there's only one.
127     my $server = $es->{server};
128     delete $es->{server};
129     if ( ref($server) eq 'ARRAY' ) {
130
131         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
132         $es->{nodes} = $server;
133     }
134     elsif ($server) {
135         $es->{nodes} = [$server];
136     }
137     else {
138         die "No elasticsearch servers were specified in koha-conf.xml.\n";
139     }
140     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
141       if ( !$es->{index_name} );
142     # Append the name of this particular index to our namespace
143     $es->{index_name} .= '_' . $self->index;
144
145     $es->{key_prefix} = 'es_';
146     $es->{client} //= '5_0::Direct';
147     $es->{cxn_pool} //= 'Sniff';
148     $es->{request_timeout} //= 60;
149
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         # Clone the general mapping to break ties with the original hash
197         my $mappings = {
198             data => clone(_get_elasticsearch_field_config('general', ''))
199         };
200         my $marcflavour = lc C4::Context->preference('marcflavour');
201         $self->_foreach_mapping(
202             sub {
203                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
204
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 _get_elasticsearch_field_config
245
246 Get the Elasticsearch field config for the given purpose and data type.
247
248 $mapping = _get_elasticsearch_field_config('search', 'text');
249
250 =cut
251
252 sub _get_elasticsearch_field_config {
253
254     my ( $purpose, $type ) = @_;
255
256     # Use state to speed up repeated calls
257     state $settings = undef;
258     if (!defined $settings) {
259         my $config_file = C4::Context->config('elasticsearch_field_config');
260         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
261         $settings = LoadFile( $config_file );
262     }
263
264     if (!defined $settings->{$purpose}) {
265         die "Field purpose $purpose not defined in field config";
266     }
267     if ($type eq '') {
268         return $settings->{$purpose};
269     }
270     if (defined $settings->{$purpose}{$type}) {
271         return $settings->{$purpose}{$type};
272     }
273     if (defined $settings->{$purpose}{'default'}) {
274         return $settings->{$purpose}{'default'};
275     }
276     return;
277 }
278
279 sub reset_elasticsearch_mappings {
280     my ( $reset_fields ) = @_;
281     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
282     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
283     my $indexes = LoadFile( $mappings_yaml );
284
285     while ( my ( $index_name, $fields ) = each %$indexes ) {
286         while ( my ( $field_name, $data ) = each %$fields ) {
287             my $field_type = $data->{type};
288             my $field_label = $data->{label};
289             my $mappings = $data->{mappings};
290             my $facet_order = $data->{facet_order};
291             my $search_field = Koha::SearchFields->find_or_create({
292                 name  => $field_name,
293                 label => $field_label,
294                 type  => $field_type,
295             },
296             {
297                 key => 'name'
298             });
299             $search_field->update(
300                 {
301                     facet_order => $facet_order
302                 }
303             );
304             for my $mapping ( @$mappings ) {
305                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
306                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
307             }
308         }
309     }
310 }
311
312 # This overrides the accessor provided by Class::Accessor so that if
313 # sort_fields isn't set, then it'll generate it.
314 sub sort_fields {
315     my $self = shift;
316     if (@_) {
317         $self->_sort_fields_accessor(@_);
318         return;
319     }
320     my $val = $self->_sort_fields_accessor();
321     return $val if $val;
322
323     # This will populate the accessor as a side effect
324     $self->get_elasticsearch_mappings();
325     return $self->_sort_fields_accessor();
326 }
327
328 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
329
330     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
331
332 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
333 Since we group all mappings by MARC field targets C<$mappings> will contain
334 all targets for C<$data> and thus we need to fetch the MARC field only once.
335 C<$mappings> will be applied to C<$record_document> and new field values added.
336 The method has no return value.
337
338 =over 4
339
340 =item C<$mappings>
341
342 Arrayref of mappings containing arrayrefs in the format
343 [C<$target>, C<$options>] where C<$target> is the name of the target field and
344 C<$options> is a hashref containing processing directives for this particular
345 mapping.
346
347 =item C<$data>
348
349 The source data from a MARC record field.
350
351 =item C<$record_document>
352
353 Hashref representing the Elasticsearch document on which mappings should be
354 applied.
355
356 =item C<$altscript>
357
358 A boolean value indicating whether an alternate script presentation is being
359 processed.
360
361 =back
362
363 =cut
364
365 sub _process_mappings {
366     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
367     foreach my $mapping (@{$mappings}) {
368         my ($target, $options) = @{$mapping};
369
370         # Don't process sort fields for alternate scripts
371         my $sort = $target =~ /__sort$/;
372         if ($sort && $altscript) {
373             next;
374         }
375
376         # Copy (scalar) data since can have multiple targets
377         # with differing options for (possibly) mutating data
378         # so need a different copy for each
379         my $_data = $data;
380         $record_document->{$target} //= [];
381         if (defined $options->{substr}) {
382             my ($start, $length) = @{$options->{substr}};
383             $_data = length($data) > $start ? substr $data, $start, $length : '';
384         }
385         if (defined $options->{value_callbacks}) {
386             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
387         }
388         if (defined $options->{property}) {
389             $_data = {
390                 $options->{property} => $_data
391             }
392         }
393         push @{$record_document->{$target}}, $_data;
394     }
395 }
396
397 =head2 marc_records_to_documents($marc_records)
398
399     my @record_documents = $self->marc_records_to_documents($marc_records);
400
401 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
402
403 Returns array of hash references, representing Elasticsearch documents,
404 acceptable as body payload in C<Search::Elasticsearch> requests.
405
406 =over 4
407
408 =item C<$marc_documents>
409
410 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
411
412 =back
413
414 =cut
415
416 sub marc_records_to_documents {
417     my ($self, $records) = @_;
418     my $rules = $self->_get_marc_mapping_rules();
419     my $control_fields_rules = $rules->{control_fields};
420     my $data_fields_rules = $rules->{data_fields};
421     my $marcflavour = lc C4::Context->preference('marcflavour');
422
423     my @record_documents;
424
425     foreach my $record (@{$records}) {
426         my $record_document = {};
427         my $mappings = $rules->{leader};
428         if ($mappings) {
429             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
430         }
431         foreach my $field ($record->fields()) {
432             if ($field->is_control_field()) {
433                 my $mappings = $control_fields_rules->{$field->tag()};
434                 if ($mappings) {
435                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
436                 }
437             }
438             else {
439                 my $tag = $field->tag();
440                 # Handle alternate scripts in MARC 21
441                 my $altscript = 0;
442                 if ($marcflavour eq 'marc21' && $tag eq '880') {
443                     my $sub6 = $field->subfield('6');
444                     if ($sub6 =~ /^(...)-\d+/) {
445                         $tag = $1;
446                         $altscript = 1;
447                     }
448                 }
449
450                 my $data_field_rules = $data_fields_rules->{$tag};
451
452                 if ($data_field_rules) {
453                     my $subfields_mappings = $data_field_rules->{subfields};
454                     my $wildcard_mappings = $subfields_mappings->{'*'};
455                     foreach my $subfield ($field->subfields()) {
456                         my ($code, $data) = @{$subfield};
457                         my $mappings = $subfields_mappings->{$code} // [];
458                         if ($wildcard_mappings) {
459                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
460                         }
461                         if (@{$mappings}) {
462                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
463                         }
464                     }
465
466                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
467                     if ($subfields_join_mappings) {
468                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
469                             # Map each subfield to values, remove empty values, join with space
470                             my $data = join(
471                                 ' ',
472                                 grep(
473                                     $_,
474                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
475                                 )
476                             );
477                             if ($data) {
478                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
479                             }
480                         }
481                     }
482                 }
483             }
484         }
485         foreach my $field (keys %{$rules->{defaults}}) {
486             unless (defined $record_document->{$field}) {
487                 $record_document->{$field} = $rules->{defaults}->{$field};
488             }
489         }
490         foreach my $field (@{$rules->{sum}}) {
491             if (defined $record_document->{$field}) {
492                 # TODO: validate numeric? filter?
493                 # TODO: Or should only accept fields without nested values?
494                 # TODO: Quick and dirty, improve if needed
495                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
496             }
497         }
498         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
499         foreach my $field (@{$rules->{isbn}}) {
500             if (defined $record_document->{$field}) {
501                 my @isbns = ();
502                 foreach my $input_isbn (@{$record_document->{$field}}) {
503                     my $isbn = Business::ISBN->new($input_isbn);
504                     if (defined $isbn && $isbn->is_valid) {
505                         my $isbn13 = $isbn->as_isbn13->as_string;
506                         push @isbns, $isbn13;
507                         $isbn13 =~ s/\-//g;
508                         push @isbns, $isbn13;
509
510                         my $isbn10 = $isbn->as_isbn10;
511                         if ($isbn10) {
512                             $isbn10 = $isbn10->as_string;
513                             push @isbns, $isbn10;
514                             $isbn10 =~ s/\-//g;
515                             push @isbns, $isbn10;
516                         }
517                     } else {
518                         push @isbns, $input_isbn;
519                     }
520                 }
521                 $record_document->{$field} = \@isbns;
522             }
523         }
524
525         # Remove duplicate values and collapse sort fields
526         foreach my $field (keys %{$record_document}) {
527             if (ref($record_document->{$field}) eq 'ARRAY') {
528                 @{$record_document->{$field}} = do {
529                     my %seen;
530                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
531                 };
532                 if ($field =~ /__sort$/) {
533                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
534                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
535                 }
536             }
537         }
538
539         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
540         $record->encoding('UTF-8');
541         my @warnings;
542         {
543             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
544             local $SIG{__WARN__} = sub {
545                 push @warnings, $_[0];
546             };
547             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
548         }
549         if (@warnings) {
550             # Suppress warnings if record length exceeded
551             unless (substr($record->leader(), 0, 5) eq '99999') {
552                 foreach my $warning (@warnings) {
553                     carp $warning;
554                 }
555             }
556             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
557             $record_document->{'marc_format'} = 'MARCXML';
558         }
559         else {
560             $record_document->{'marc_format'} = 'base64ISO2709';
561         }
562         my $id = $record->subfield('999', 'c');
563         push @record_documents, [$id, $record_document];
564     }
565     return \@record_documents;
566 }
567
568 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
569
570     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
571
572 Get mappings, an internal data structure later used by
573 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
574 data for a MARC mapping.
575
576 The returned C<$mappings> is not to to be confused with mappings provided by
577 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
578 provided by C<_foreach_mapping> and expands it to this internal data structure.
579 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
580 is then applied to each MARC target (leader, control field data, subfield or
581 joined subfields) and integrated into the mapping rules data structure used in
582 C<marc_records_to_documents> to transform MARC records into Elasticsearch
583 documents.
584
585 =over 4
586
587 =item C<$facet>
588
589 Boolean indicating whether to create a facet field for this mapping.
590
591 =item C<$suggestible>
592
593 Boolean indicating whether to create a suggestion field for this mapping.
594
595 =item C<$sort>
596
597 Boolean indicating whether to create a sort field for this mapping.
598
599 =item C<$target_name>
600
601 Elasticsearch document target field name.
602
603 =item C<$target_type>
604
605 Elasticsearch document target field type.
606
607 =item C<$range>
608
609 An optional range as a string in the format "<START>-<END>" or "<START>",
610 where "<START>" and "<END>" are integers specifying a range that will be used
611 for extracting a substring from MARC data as Elasticsearch field target value.
612
613 The first character position is "0", and the range is inclusive,
614 so "0-2" means the first three characters of MARC data.
615
616 If only "<START>" is provided only one character at position "<START>" will
617 be extracted.
618
619 =back
620
621 =cut
622
623 sub _field_mappings {
624     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
625     my %mapping_defaults = ();
626     my @mappings;
627
628     my $substr_args = undef;
629     if (defined $range) {
630         # TODO: use value_callback instead?
631         my ($start, $end) = map(int, split /-/, $range, 2);
632         $substr_args = [$start];
633         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
634     }
635     my $default_options = {};
636     if ($substr_args) {
637         $default_options->{substr} = $substr_args;
638     }
639
640     # TODO: Should probably have per type value callback/hook
641     # but hard code for now
642     if ($target_type eq 'boolean') {
643         $default_options->{value_callbacks} //= [];
644         push @{$default_options->{value_callbacks}}, sub {
645             my ($value) = @_;
646             # Trim whitespace at both ends
647             $value =~ s/^\s+|\s+$//g;
648             return $value ? 'true' : 'false';
649         };
650     }
651
652     my $mapping = [$target_name, $default_options];
653     push @mappings, $mapping;
654
655     my @suffixes = ();
656     push @suffixes, 'facet' if $facet;
657     push @suffixes, 'suggestion' if $suggestible;
658     push @suffixes, 'sort' if !defined $sort || $sort;
659
660     foreach my $suffix (@suffixes) {
661         my $mapping = ["${target_name}__$suffix"];
662         # TODO: Hack, fix later in less hideous manner
663         if ($suffix eq 'suggestion') {
664             push @{$mapping}, {%{$default_options}, property => 'input'};
665         }
666         else {
667             push @{$mapping}, $default_options;
668         }
669         push @mappings, $mapping;
670     }
671     return @mappings;
672 };
673
674 =head2 _get_marc_mapping_rules
675
676     my $mapping_rules = $self->_get_marc_mapping_rules()
677
678 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
679
680 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
681 each call to C<MARC::Record>->field) we create an optimized structure of mapping
682 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
683
684 We can then iterate through all MARC fields for each record and apply all relevant
685 rules once per fields instead of retreiving fields multiple times for each mapping rule
686 which is terribly slow.
687
688 =cut
689
690 # TODO: This structure can be used for processing multiple MARC::Records so is currently
691 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
692 # memory cache which it is currently not. The performance gain of caching
693 # would probably be marginal, but to do this could be a further improvement.
694
695 sub _get_marc_mapping_rules {
696     my ($self) = @_;
697     my $marcflavour = lc C4::Context->preference('marcflavour');
698     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
699     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
700     my $rules = {
701         'leader' => [],
702         'control_fields' => {},
703         'data_fields' => {},
704         'sum' => [],
705         'isbn' => [],
706         'defaults' => {}
707     };
708
709     $self->_foreach_mapping(sub {
710         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
711         return if $marc_type ne $marcflavour;
712
713         if ($type eq 'sum') {
714             push @{$rules->{sum}}, $name;
715         }
716         elsif ($type eq 'isbn') {
717             push @{$rules->{isbn}}, $name;
718         }
719         elsif ($type eq 'boolean') {
720             # boolean gets special handling, if value doesn't exist for a field,
721             # it is set to false
722             $rules->{defaults}->{$name} = 'false';
723         }
724
725         if ($marc_field =~ $field_spec_regexp) {
726             my $field_tag = $1;
727
728             my @subfields;
729             my @subfield_groups;
730             # Parse and separate subfields form subfield groups
731             if (defined $2) {
732                 my $subfield_group = '';
733                 my $open_group = 0;
734
735                 foreach my $token (split //, $2) {
736                     if ($token eq "(") {
737                         if ($open_group) {
738                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
739                                 "Unmatched opening parenthesis for $marc_field"
740                             );
741                         }
742                         else {
743                             $open_group = 1;
744                         }
745                     }
746                     elsif ($token eq ")") {
747                         if ($open_group) {
748                             if ($subfield_group) {
749                                 push @subfield_groups, $subfield_group;
750                                 $subfield_group = '';
751                             }
752                             $open_group = 0;
753                         }
754                         else {
755                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
756                                 "Unmatched closing parenthesis for $marc_field"
757                             );
758                         }
759                     }
760                     elsif ($open_group) {
761                         $subfield_group .= $token;
762                     }
763                     else {
764                         push @subfields, $token;
765                     }
766                 }
767             }
768             else {
769                 push @subfields, '*';
770             }
771
772             my $range = defined $3 ? $3 : undef;
773             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
774
775             if ($field_tag < 10) {
776                 $rules->{control_fields}->{$field_tag} //= [];
777                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
778             }
779             else {
780                 $rules->{data_fields}->{$field_tag} //= {};
781                 foreach my $subfield (@subfields) {
782                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
783                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
784                 }
785                 foreach my $subfield_group (@subfield_groups) {
786                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
787                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
788                 }
789             }
790         }
791         elsif ($marc_field =~ $leader_regexp) {
792             my $range = defined $1 ? $1 : undef;
793             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
794             push @{$rules->{leader}}, @mappings;
795         }
796         else {
797             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
798                 "Invalid MARC field expression: $marc_field"
799             );
800         }
801     });
802     return $rules;
803 }
804
805 =head2 _foreach_mapping
806
807     $self->_foreach_mapping(
808         sub {
809             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
810                 $marc_field )
811               = @_;
812             return unless $marc_type eq 'marc21';
813             print "Data comes from: " . $marc_field . "\n";
814         }
815     );
816
817 This allows you to apply a function to each entry in the elasticsearch mappings
818 table, in order to build the mappings for whatever is needed.
819
820 In the provided function, the files are:
821
822 =over 4
823
824 =item C<$name>
825
826 The field name for elasticsearch (corresponds to the 'mapping' column in the
827 database.
828
829 =item C<$type>
830
831 The type for this value, e.g. 'string'.
832
833 =item C<$facet>
834
835 True if this value should be facetised. This only really makes sense if the
836 field is understood by the facet processing code anyway.
837
838 =item C<$sort>
839
840 True if this is a field that a) needs special sort handling, and b) if it
841 should be sorted on. False if a) but not b). Undef if not a). This allows,
842 for example, author to be sorted on but not everything marked with "author"
843 to be included in that sort.
844
845 =item C<$marc_type>
846
847 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
848 'unimarc', 'normarc'.
849
850 =item C<$marc_field>
851
852 A string that describes the MARC field that contains the data to extract.
853 These are of a form suited to Catmandu's MARC fixers.
854
855 =back
856
857 =cut
858
859 sub _foreach_mapping {
860     my ( $self, $sub ) = @_;
861
862     # TODO use a caching framework here
863     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
864         {
865             'search_marc_map.index_name' => $self->index,
866         },
867         {   join => { search_marc_to_fields => 'search_marc_map' },
868             '+select' => [
869                 'search_marc_to_fields.facet',
870                 'search_marc_to_fields.suggestible',
871                 'search_marc_to_fields.sort',
872                 'search_marc_map.marc_type',
873                 'search_marc_map.marc_field',
874             ],
875             '+as'     => [
876                 'facet',
877                 'suggestible',
878                 'sort',
879                 'marc_type',
880                 'marc_field',
881             ],
882         }
883     );
884
885     while ( my $search_field = $search_fields->next ) {
886         $sub->(
887             # Force lower case on indexed field names for case insensitive
888             # field name searches
889             lc($search_field->name),
890             $search_field->type,
891             $search_field->get_column('facet'),
892             $search_field->get_column('suggestible'),
893             $search_field->get_column('sort'),
894             $search_field->get_column('marc_type'),
895             $search_field->get_column('marc_field'),
896         );
897     }
898 }
899
900 =head2 process_error
901
902     die process_error($@);
903
904 This parses an Elasticsearch error message and produces a human-readable
905 result from it. This result is probably missing all the useful information
906 that you might want in diagnosing an issue, so the warning is also logged.
907
908 Note that currently the resulting message is not internationalised. This
909 will happen eventually by some method or other.
910
911 =cut
912
913 sub process_error {
914     my ($self, $msg) = @_;
915
916     warn $msg; # simple logging
917
918     # This is super-primitive
919     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
920
921     return "Unable to perform your search. Please try again.\n";
922 }
923
924 =head2 _read_configuration
925
926     my $conf = _read_configuration();
927
928 Reads the I<configuration file> and returns a hash structure with the
929 configuration information. It raises an exception if mandatory entries
930 are missing.
931
932 The hashref structure has the following form:
933
934     {
935         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
936         'index_name' => 'koha_instance',
937     }
938
939 This is configured by the following in the C<config> block in koha-conf.xml:
940
941     <elasticsearch>
942         <server>127.0.0.1:9200</server>
943         <server>anotherserver:9200</server>
944         <index_name>koha_instance</index_name>
945     </elasticsearch>
946
947 =cut
948
949 sub _read_configuration {
950
951     my $configuration;
952
953     my $conf = C4::Context->config('elasticsearch');
954     Koha::Exceptions::Config::MissingEntry->throw(
955         "Missing 'elasticsearch' block in config file")
956       unless defined $conf;
957
958     if ( $conf && $conf->{server} ) {
959         my $nodes = $conf->{server};
960         if ( ref($nodes) eq 'ARRAY' ) {
961             $configuration->{nodes} = $nodes;
962         }
963         else {
964             $configuration->{nodes} = [$nodes];
965         }
966     }
967     else {
968         Koha::Exceptions::Config::MissingEntry->throw(
969             "Missing 'server' entry in config file for elasticsearch");
970     }
971
972     if ( defined $conf->{index_name} ) {
973         $configuration->{index_name} = $conf->{index_name};
974     }
975     else {
976         Koha::Exceptions::Config::MissingEntry->throw(
977             "Missing 'index_name' entry in config file for elasticsearch");
978     }
979
980     return $configuration;
981 }
982
983 =head2 get_facetable_fields
984
985 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
986
987 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
988
989 =cut
990
991 sub get_facetable_fields {
992     my ($self) = @_;
993
994     # These should correspond to the ES field names, as opposed to the CCL
995     # things that zebra uses.
996     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
997     my @faceted_fields = Koha::SearchFields->search(
998         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
999     );
1000     my @not_faceted_fields = Koha::SearchFields->search(
1001         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1002     );
1003     # This could certainly be improved
1004     return ( @faceted_fields, @not_faceted_fields );
1005 }
1006
1007 1;
1008
1009 __END__
1010
1011 =head1 AUTHOR
1012
1013 =over 4
1014
1015 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1016
1017 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1018
1019 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1020
1021 =back
1022
1023 =cut