4435015d81b49f6b66cfa23a25c224f7f9361f51
[koha-equinox.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use Clone qw(clone);
31 use JSON;
32 use Modern::Perl;
33 use Readonly;
34 use Search::Elasticsearch;
35 use Try::Tiny;
36 use YAML::Syck;
37
38 use List::Util qw( sum0 reduce );
39 use MARC::File::XML;
40 use MIME::Base64;
41 use Encode qw(encode);
42 use Business::ISBN;
43
44 __PACKAGE__->mk_ro_accessors(qw( index ));
45 __PACKAGE__->mk_accessors(qw( sort_fields ));
46
47 # Constants to refer to the standard index names
48 Readonly our $BIBLIOS_INDEX     => 'biblios';
49 Readonly our $AUTHORITIES_INDEX => 'authorities';
50
51 =head1 NAME
52
53 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
54
55 =head1 ACCESSORS
56
57 =over 4
58
59 =item index
60
61 The name of the index to use, generally 'biblios' or 'authorities'.
62
63 =back
64
65 =head1 FUNCTIONS
66
67 =cut
68
69 sub new {
70     my $class = shift @_;
71     my $self = $class->SUPER::new(@_);
72     # Check for a valid index
73     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
74     return $self;
75 }
76
77 =head2 get_elasticsearch
78
79     my $elasticsearch_client = $self->get_elasticsearch();
80
81 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
82 instance level and will be reused if method is called multiple times.
83
84 =cut
85
86 sub get_elasticsearch {
87     my $self = shift @_;
88     unless (defined $self->{elasticsearch}) {
89         my $conf = $self->get_elasticsearch_params();
90         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
91     }
92     return $self->{elasticsearch};
93 }
94
95 =head2 get_elasticsearch_params
96
97     my $params = $self->get_elasticsearch_params();
98
99 This provides a hashref that contains the parameters for connecting to the
100 ElasicSearch servers, in the form:
101
102     {
103         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
104         'index_name' => 'koha_instance_index',
105     }
106
107 This is configured by the following in the C<config> block in koha-conf.xml:
108
109     <elasticsearch>
110         <server>127.0.0.1:9200</server>
111         <server>anotherserver:9200</server>
112         <index_name>koha_instance</index_name>
113     </elasticsearch>
114
115 =cut
116
117 sub get_elasticsearch_params {
118     my ($self) = @_;
119
120     # Copy the hash so that we're not modifying the original
121     my $conf = C4::Context->config('elasticsearch');
122     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
123     my $es = { %{ $conf } };
124
125     # Helpfully, the multiple server lines end up in an array for us anyway
126     # if there are multiple ones, but not if there's only one.
127     my $server = $es->{server};
128     delete $es->{server};
129     if ( ref($server) eq 'ARRAY' ) {
130
131         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
132         $es->{nodes} = $server;
133     }
134     elsif ($server) {
135         $es->{nodes} = [$server];
136     }
137     else {
138         die "No elasticsearch servers were specified in koha-conf.xml.\n";
139     }
140     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
141       if ( !$es->{index_name} );
142     # Append the name of this particular index to our namespace
143     $es->{index_name} .= '_' . $self->index;
144
145     $es->{key_prefix} = 'es_';
146     $es->{client} //= '5_0::Direct';
147     $es->{cxn_pool} //= 'Static';
148     $es->{request_timeout} //= 60;
149
150     return $es;
151 }
152
153 =head2 get_elasticsearch_settings
154
155     my $settings = $self->get_elasticsearch_settings();
156
157 This provides the settings provided to Elasticsearch when an index is created.
158 These can do things like define tokenization methods.
159
160 A hashref containing the settings is returned.
161
162 =cut
163
164 sub get_elasticsearch_settings {
165     my ($self) = @_;
166
167     # Use state to speed up repeated calls
168     state $settings = undef;
169     if (!defined $settings) {
170         my $config_file = C4::Context->config('elasticsearch_index_config');
171         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
172         $settings = LoadFile( $config_file );
173     }
174
175     return $settings;
176 }
177
178 =head2 get_elasticsearch_mappings
179
180     my $mappings = $self->get_elasticsearch_mappings();
181
182 This provides the mappings that get passed to Elasticsearch when an index is
183 created.
184
185 =cut
186
187 sub get_elasticsearch_mappings {
188     my ($self) = @_;
189
190     # Use state to speed up repeated calls
191     state %all_mappings;
192     state %sort_fields;
193
194     if (!defined $all_mappings{$self->index}) {
195         $sort_fields{$self->index} = {};
196         # Clone the general mapping to break ties with the original hash
197         my $mappings = {
198             data => clone(_get_elasticsearch_field_config('general', ''))
199         };
200         my $marcflavour = lc C4::Context->preference('marcflavour');
201         $self->_foreach_mapping(
202             sub {
203                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
204
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
221
222                 if ($facet) {
223                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
224                 }
225                 if ($suggestible) {
226                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
227                 }
228                 # Sort is a bit special as it can be true, false, undef.
229                 # We care about "true" or "undef",
230                 # "undef" means to do the default thing, which is make it sortable.
231                 if (!defined $sort || $sort) {
232                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
233                     $sort_fields{$self->index}{$name} = 1;
234                 }
235             }
236         );
237         $all_mappings{$self->index} = $mappings;
238     }
239     $self->sort_fields(\%{$sort_fields{$self->index}});
240
241     return $all_mappings{$self->index};
242 }
243
244 =head2 _get_elasticsearch_field_config
245
246 Get the Elasticsearch field config for the given purpose and data type.
247
248 $mapping = _get_elasticsearch_field_config('search', 'text');
249
250 =cut
251
252 sub _get_elasticsearch_field_config {
253
254     my ( $purpose, $type ) = @_;
255
256     # Use state to speed up repeated calls
257     state $settings = undef;
258     if (!defined $settings) {
259         my $config_file = C4::Context->config('elasticsearch_field_config');
260         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
261         $settings = LoadFile( $config_file );
262     }
263
264     if (!defined $settings->{$purpose}) {
265         die "Field purpose $purpose not defined in field config";
266     }
267     if ($type eq '') {
268         return $settings->{$purpose};
269     }
270     if (defined $settings->{$purpose}{$type}) {
271         return $settings->{$purpose}{$type};
272     }
273     if (defined $settings->{$purpose}{'default'}) {
274         return $settings->{$purpose}{'default'};
275     }
276     return;
277 }
278
279 sub reset_elasticsearch_mappings {
280     my ( $reset_fields ) = @_;
281     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
282     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
283     my $indexes = LoadFile( $mappings_yaml );
284
285     while ( my ( $index_name, $fields ) = each %$indexes ) {
286         while ( my ( $field_name, $data ) = each %$fields ) {
287             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight facet_order /;
288             $sf_params{name} = $field_name;
289
290             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
291
292             my $mappings = $data->{mappings};
293             for my $mapping ( @$mappings ) {
294                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
295                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
296             }
297         }
298     }
299 }
300
301 # This overrides the accessor provided by Class::Accessor so that if
302 # sort_fields isn't set, then it'll generate it.
303 sub sort_fields {
304     my $self = shift;
305     if (@_) {
306         $self->_sort_fields_accessor(@_);
307         return;
308     }
309     my $val = $self->_sort_fields_accessor();
310     return $val if $val;
311
312     # This will populate the accessor as a side effect
313     $self->get_elasticsearch_mappings();
314     return $self->_sort_fields_accessor();
315 }
316
317 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
318
319     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
320
321 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
322 Since we group all mappings by MARC field targets C<$mappings> will contain
323 all targets for C<$data> and thus we need to fetch the MARC field only once.
324 C<$mappings> will be applied to C<$record_document> and new field values added.
325 The method has no return value.
326
327 =over 4
328
329 =item C<$mappings>
330
331 Arrayref of mappings containing arrayrefs in the format
332 [C<$target>, C<$options>] where C<$target> is the name of the target field and
333 C<$options> is a hashref containing processing directives for this particular
334 mapping.
335
336 =item C<$data>
337
338 The source data from a MARC record field.
339
340 =item C<$record_document>
341
342 Hashref representing the Elasticsearch document on which mappings should be
343 applied.
344
345 =item C<$altscript>
346
347 A boolean value indicating whether an alternate script presentation is being
348 processed.
349
350 =back
351
352 =cut
353
354 sub _process_mappings {
355     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
356     foreach my $mapping (@{$mappings}) {
357         my ($target, $options) = @{$mapping};
358
359         # Don't process sort fields for alternate scripts
360         my $sort = $target =~ /__sort$/;
361         if ($sort && $altscript) {
362             next;
363         }
364
365         # Copy (scalar) data since can have multiple targets
366         # with differing options for (possibly) mutating data
367         # so need a different copy for each
368         my $_data = $data;
369         $record_document->{$target} //= [];
370         if (defined $options->{substr}) {
371             my ($start, $length) = @{$options->{substr}};
372             $_data = length($data) > $start ? substr $data, $start, $length : '';
373         }
374         if (defined $options->{value_callbacks}) {
375             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
376         }
377         if (defined $options->{property}) {
378             $_data = {
379                 $options->{property} => $_data
380             }
381         }
382         push @{$record_document->{$target}}, $_data;
383     }
384 }
385
386 =head2 marc_records_to_documents($marc_records)
387
388     my @record_documents = $self->marc_records_to_documents($marc_records);
389
390 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
391
392 Returns array of hash references, representing Elasticsearch documents,
393 acceptable as body payload in C<Search::Elasticsearch> requests.
394
395 =over 4
396
397 =item C<$marc_documents>
398
399 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
400
401 =back
402
403 =cut
404
405 sub marc_records_to_documents {
406     my ($self, $records) = @_;
407     my $rules = $self->_get_marc_mapping_rules();
408     my $control_fields_rules = $rules->{control_fields};
409     my $data_fields_rules = $rules->{data_fields};
410     my $marcflavour = lc C4::Context->preference('marcflavour');
411
412     my @record_documents;
413
414     foreach my $record (@{$records}) {
415         my $record_document = {};
416         my $mappings = $rules->{leader};
417         if ($mappings) {
418             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
419         }
420         foreach my $field ($record->fields()) {
421             if ($field->is_control_field()) {
422                 my $mappings = $control_fields_rules->{$field->tag()};
423                 if ($mappings) {
424                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
425                 }
426             }
427             else {
428                 my $tag = $field->tag();
429                 # Handle alternate scripts in MARC 21
430                 my $altscript = 0;
431                 if ($marcflavour eq 'marc21' && $tag eq '880') {
432                     my $sub6 = $field->subfield('6');
433                     if ($sub6 =~ /^(...)-\d+/) {
434                         $tag = $1;
435                         $altscript = 1;
436                     }
437                 }
438
439                 my $data_field_rules = $data_fields_rules->{$tag};
440
441                 if ($data_field_rules) {
442                     my $subfields_mappings = $data_field_rules->{subfields};
443                     my $wildcard_mappings = $subfields_mappings->{'*'};
444                     foreach my $subfield ($field->subfields()) {
445                         my ($code, $data) = @{$subfield};
446                         my $mappings = $subfields_mappings->{$code} // [];
447                         if ($wildcard_mappings) {
448                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
449                         }
450                         if (@{$mappings}) {
451                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
452                         }
453                     }
454
455                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
456                     if ($subfields_join_mappings) {
457                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
458                             # Map each subfield to values, remove empty values, join with space
459                             my $data = join(
460                                 ' ',
461                                 grep(
462                                     $_,
463                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
464                                 )
465                             );
466                             if ($data) {
467                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
468                             }
469                         }
470                     }
471                 }
472             }
473         }
474         foreach my $field (keys %{$rules->{defaults}}) {
475             unless (defined $record_document->{$field}) {
476                 $record_document->{$field} = $rules->{defaults}->{$field};
477             }
478         }
479         foreach my $field (@{$rules->{sum}}) {
480             if (defined $record_document->{$field}) {
481                 # TODO: validate numeric? filter?
482                 # TODO: Or should only accept fields without nested values?
483                 # TODO: Quick and dirty, improve if needed
484                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
485             }
486         }
487         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
488         foreach my $field (@{$rules->{isbn}}) {
489             if (defined $record_document->{$field}) {
490                 my @isbns = ();
491                 foreach my $input_isbn (@{$record_document->{$field}}) {
492                     my $isbn = Business::ISBN->new($input_isbn);
493                     if (defined $isbn && $isbn->is_valid) {
494                         my $isbn13 = $isbn->as_isbn13->as_string;
495                         push @isbns, $isbn13;
496                         $isbn13 =~ s/\-//g;
497                         push @isbns, $isbn13;
498
499                         my $isbn10 = $isbn->as_isbn10;
500                         if ($isbn10) {
501                             $isbn10 = $isbn10->as_string;
502                             push @isbns, $isbn10;
503                             $isbn10 =~ s/\-//g;
504                             push @isbns, $isbn10;
505                         }
506                     } else {
507                         push @isbns, $input_isbn;
508                     }
509                 }
510                 $record_document->{$field} = \@isbns;
511             }
512         }
513
514         # Remove duplicate values and collapse sort fields
515         foreach my $field (keys %{$record_document}) {
516             if (ref($record_document->{$field}) eq 'ARRAY') {
517                 @{$record_document->{$field}} = do {
518                     my %seen;
519                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
520                 };
521                 if ($field =~ /__sort$/) {
522                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
523                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
524                 }
525             }
526         }
527
528         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
529         $record->encoding('UTF-8');
530         my @warnings;
531         {
532             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
533             local $SIG{__WARN__} = sub {
534                 push @warnings, $_[0];
535             };
536             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
537         }
538         if (@warnings) {
539             # Suppress warnings if record length exceeded
540             unless (substr($record->leader(), 0, 5) eq '99999') {
541                 foreach my $warning (@warnings) {
542                     carp $warning;
543                 }
544             }
545             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
546             $record_document->{'marc_format'} = 'MARCXML';
547         }
548         else {
549             $record_document->{'marc_format'} = 'base64ISO2709';
550         }
551         my $id = $record->subfield('999', 'c');
552         push @record_documents, [$id, $record_document];
553     }
554     return \@record_documents;
555 }
556
557 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
558
559     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
560
561 Get mappings, an internal data structure later used by
562 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
563 data for a MARC mapping.
564
565 The returned C<$mappings> is not to to be confused with mappings provided by
566 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
567 provided by C<_foreach_mapping> and expands it to this internal data structure.
568 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
569 is then applied to each MARC target (leader, control field data, subfield or
570 joined subfields) and integrated into the mapping rules data structure used in
571 C<marc_records_to_documents> to transform MARC records into Elasticsearch
572 documents.
573
574 =over 4
575
576 =item C<$facet>
577
578 Boolean indicating whether to create a facet field for this mapping.
579
580 =item C<$suggestible>
581
582 Boolean indicating whether to create a suggestion field for this mapping.
583
584 =item C<$sort>
585
586 Boolean indicating whether to create a sort field for this mapping.
587
588 =item C<$target_name>
589
590 Elasticsearch document target field name.
591
592 =item C<$target_type>
593
594 Elasticsearch document target field type.
595
596 =item C<$range>
597
598 An optional range as a string in the format "<START>-<END>" or "<START>",
599 where "<START>" and "<END>" are integers specifying a range that will be used
600 for extracting a substring from MARC data as Elasticsearch field target value.
601
602 The first character position is "0", and the range is inclusive,
603 so "0-2" means the first three characters of MARC data.
604
605 If only "<START>" is provided only one character at position "<START>" will
606 be extracted.
607
608 =back
609
610 =cut
611
612 sub _field_mappings {
613     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
614     my %mapping_defaults = ();
615     my @mappings;
616
617     my $substr_args = undef;
618     if (defined $range) {
619         # TODO: use value_callback instead?
620         my ($start, $end) = map(int, split /-/, $range, 2);
621         $substr_args = [$start];
622         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
623     }
624     my $default_options = {};
625     if ($substr_args) {
626         $default_options->{substr} = $substr_args;
627     }
628
629     # TODO: Should probably have per type value callback/hook
630     # but hard code for now
631     if ($target_type eq 'boolean') {
632         $default_options->{value_callbacks} //= [];
633         push @{$default_options->{value_callbacks}}, sub {
634             my ($value) = @_;
635             # Trim whitespace at both ends
636             $value =~ s/^\s+|\s+$//g;
637             return $value ? 'true' : 'false';
638         };
639     }
640
641     my $mapping = [$target_name, $default_options];
642     push @mappings, $mapping;
643
644     my @suffixes = ();
645     push @suffixes, 'facet' if $facet;
646     push @suffixes, 'suggestion' if $suggestible;
647     push @suffixes, 'sort' if !defined $sort || $sort;
648
649     foreach my $suffix (@suffixes) {
650         my $mapping = ["${target_name}__$suffix"];
651         # TODO: Hack, fix later in less hideous manner
652         if ($suffix eq 'suggestion') {
653             push @{$mapping}, {%{$default_options}, property => 'input'};
654         }
655         else {
656             push @{$mapping}, $default_options;
657         }
658         push @mappings, $mapping;
659     }
660     return @mappings;
661 };
662
663 =head2 _get_marc_mapping_rules
664
665     my $mapping_rules = $self->_get_marc_mapping_rules()
666
667 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
668
669 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
670 each call to C<MARC::Record>->field) we create an optimized structure of mapping
671 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
672
673 We can then iterate through all MARC fields for each record and apply all relevant
674 rules once per fields instead of retreiving fields multiple times for each mapping rule
675 which is terribly slow.
676
677 =cut
678
679 # TODO: This structure can be used for processing multiple MARC::Records so is currently
680 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
681 # memory cache which it is currently not. The performance gain of caching
682 # would probably be marginal, but to do this could be a further improvement.
683
684 sub _get_marc_mapping_rules {
685     my ($self) = @_;
686     my $marcflavour = lc C4::Context->preference('marcflavour');
687     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
688     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
689     my $rules = {
690         'leader' => [],
691         'control_fields' => {},
692         'data_fields' => {},
693         'sum' => [],
694         'isbn' => [],
695         'defaults' => {}
696     };
697
698     $self->_foreach_mapping(sub {
699         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
700         return if $marc_type ne $marcflavour;
701
702         if ($type eq 'sum') {
703             push @{$rules->{sum}}, $name;
704         }
705         elsif ($type eq 'isbn') {
706             push @{$rules->{isbn}}, $name;
707         }
708         elsif ($type eq 'boolean') {
709             # boolean gets special handling, if value doesn't exist for a field,
710             # it is set to false
711             $rules->{defaults}->{$name} = 'false';
712         }
713
714         if ($marc_field =~ $field_spec_regexp) {
715             my $field_tag = $1;
716
717             my @subfields;
718             my @subfield_groups;
719             # Parse and separate subfields form subfield groups
720             if (defined $2) {
721                 my $subfield_group = '';
722                 my $open_group = 0;
723
724                 foreach my $token (split //, $2) {
725                     if ($token eq "(") {
726                         if ($open_group) {
727                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
728                                 "Unmatched opening parenthesis for $marc_field"
729                             );
730                         }
731                         else {
732                             $open_group = 1;
733                         }
734                     }
735                     elsif ($token eq ")") {
736                         if ($open_group) {
737                             if ($subfield_group) {
738                                 push @subfield_groups, $subfield_group;
739                                 $subfield_group = '';
740                             }
741                             $open_group = 0;
742                         }
743                         else {
744                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
745                                 "Unmatched closing parenthesis for $marc_field"
746                             );
747                         }
748                     }
749                     elsif ($open_group) {
750                         $subfield_group .= $token;
751                     }
752                     else {
753                         push @subfields, $token;
754                     }
755                 }
756             }
757             else {
758                 push @subfields, '*';
759             }
760
761             my $range = defined $3 ? $3 : undef;
762             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
763
764             if ($field_tag < 10) {
765                 $rules->{control_fields}->{$field_tag} //= [];
766                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
767             }
768             else {
769                 $rules->{data_fields}->{$field_tag} //= {};
770                 foreach my $subfield (@subfields) {
771                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
772                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
773                 }
774                 foreach my $subfield_group (@subfield_groups) {
775                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
776                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
777                 }
778             }
779         }
780         elsif ($marc_field =~ $leader_regexp) {
781             my $range = defined $1 ? $1 : undef;
782             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
783             push @{$rules->{leader}}, @mappings;
784         }
785         else {
786             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
787                 "Invalid MARC field expression: $marc_field"
788             );
789         }
790     });
791     return $rules;
792 }
793
794 =head2 _foreach_mapping
795
796     $self->_foreach_mapping(
797         sub {
798             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
799                 $marc_field )
800               = @_;
801             return unless $marc_type eq 'marc21';
802             print "Data comes from: " . $marc_field . "\n";
803         }
804     );
805
806 This allows you to apply a function to each entry in the elasticsearch mappings
807 table, in order to build the mappings for whatever is needed.
808
809 In the provided function, the files are:
810
811 =over 4
812
813 =item C<$name>
814
815 The field name for elasticsearch (corresponds to the 'mapping' column in the
816 database.
817
818 =item C<$type>
819
820 The type for this value, e.g. 'string'.
821
822 =item C<$facet>
823
824 True if this value should be facetised. This only really makes sense if the
825 field is understood by the facet processing code anyway.
826
827 =item C<$sort>
828
829 True if this is a field that a) needs special sort handling, and b) if it
830 should be sorted on. False if a) but not b). Undef if not a). This allows,
831 for example, author to be sorted on but not everything marked with "author"
832 to be included in that sort.
833
834 =item C<$marc_type>
835
836 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
837 'unimarc', 'normarc'.
838
839 =item C<$marc_field>
840
841 A string that describes the MARC field that contains the data to extract.
842 These are of a form suited to Catmandu's MARC fixers.
843
844 =back
845
846 =cut
847
848 sub _foreach_mapping {
849     my ( $self, $sub ) = @_;
850
851     # TODO use a caching framework here
852     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
853         {
854             'search_marc_map.index_name' => $self->index,
855         },
856         {   join => { search_marc_to_fields => 'search_marc_map' },
857             '+select' => [
858                 'search_marc_to_fields.facet',
859                 'search_marc_to_fields.suggestible',
860                 'search_marc_to_fields.sort',
861                 'search_marc_map.marc_type',
862                 'search_marc_map.marc_field',
863             ],
864             '+as'     => [
865                 'facet',
866                 'suggestible',
867                 'sort',
868                 'marc_type',
869                 'marc_field',
870             ],
871         }
872     );
873
874     while ( my $search_field = $search_fields->next ) {
875         $sub->(
876             # Force lower case on indexed field names for case insensitive
877             # field name searches
878             lc($search_field->name),
879             $search_field->type,
880             $search_field->get_column('facet'),
881             $search_field->get_column('suggestible'),
882             $search_field->get_column('sort'),
883             $search_field->get_column('marc_type'),
884             $search_field->get_column('marc_field'),
885         );
886     }
887 }
888
889 =head2 process_error
890
891     die process_error($@);
892
893 This parses an Elasticsearch error message and produces a human-readable
894 result from it. This result is probably missing all the useful information
895 that you might want in diagnosing an issue, so the warning is also logged.
896
897 Note that currently the resulting message is not internationalised. This
898 will happen eventually by some method or other.
899
900 =cut
901
902 sub process_error {
903     my ($self, $msg) = @_;
904
905     warn $msg; # simple logging
906
907     # This is super-primitive
908     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
909
910     return "Unable to perform your search. Please try again.\n";
911 }
912
913 =head2 _read_configuration
914
915     my $conf = _read_configuration();
916
917 Reads the I<configuration file> and returns a hash structure with the
918 configuration information. It raises an exception if mandatory entries
919 are missing.
920
921 The hashref structure has the following form:
922
923     {
924         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
925         'index_name' => 'koha_instance',
926     }
927
928 This is configured by the following in the C<config> block in koha-conf.xml:
929
930     <elasticsearch>
931         <server>127.0.0.1:9200</server>
932         <server>anotherserver:9200</server>
933         <index_name>koha_instance</index_name>
934     </elasticsearch>
935
936 =cut
937
938 sub _read_configuration {
939
940     my $configuration;
941
942     my $conf = C4::Context->config('elasticsearch');
943     Koha::Exceptions::Config::MissingEntry->throw(
944         "Missing 'elasticsearch' block in config file")
945       unless defined $conf;
946
947     if ( $conf && $conf->{server} ) {
948         my $nodes = $conf->{server};
949         if ( ref($nodes) eq 'ARRAY' ) {
950             $configuration->{nodes} = $nodes;
951         }
952         else {
953             $configuration->{nodes} = [$nodes];
954         }
955     }
956     else {
957         Koha::Exceptions::Config::MissingEntry->throw(
958             "Missing 'server' entry in config file for elasticsearch");
959     }
960
961     if ( defined $conf->{index_name} ) {
962         $configuration->{index_name} = $conf->{index_name};
963     }
964     else {
965         Koha::Exceptions::Config::MissingEntry->throw(
966             "Missing 'index_name' entry in config file for elasticsearch");
967     }
968
969     return $configuration;
970 }
971
972 =head2 get_facetable_fields
973
974 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
975
976 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
977
978 =cut
979
980 sub get_facetable_fields {
981     my ($self) = @_;
982
983     # These should correspond to the ES field names, as opposed to the CCL
984     # things that zebra uses.
985     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
986     my @faceted_fields = Koha::SearchFields->search(
987         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
988     );
989     my @not_faceted_fields = Koha::SearchFields->search(
990         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
991     );
992     # This could certainly be improved
993     return ( @faceted_fields, @not_faceted_fields );
994 }
995
996 1;
997
998 __END__
999
1000 =head1 AUTHOR
1001
1002 =over 4
1003
1004 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1005
1006 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1007
1008 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1009
1010 =back
1011
1012 =cut