7ba3e14f70df45eda1594f803bf32fb564dfd394
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
30
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
39
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45
46 __PACKAGE__->mk_ro_accessors(qw( index ));
47 __PACKAGE__->mk_accessors(qw( sort_fields ));
48
49 # Constants to refer to the standard index names
50 Readonly our $BIBLIOS_INDEX     => 'biblios';
51 Readonly our $AUTHORITIES_INDEX => 'authorities';
52
53 =head1 NAME
54
55 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
56
57 =head1 ACCESSORS
58
59 =over 4
60
61 =item index
62
63 The name of the index to use, generally 'biblios' or 'authorities'.
64
65 =back
66
67 =head1 FUNCTIONS
68
69 =cut
70
71 sub new {
72     my $class = shift @_;
73     my $self = $class->SUPER::new(@_);
74     # Check for a valid index
75     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
76     return $self;
77 }
78
79 =head2 get_elasticsearch
80
81     my $elasticsearch_client = $self->get_elasticsearch();
82
83 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
84 instance level and will be reused if method is called multiple times.
85
86 =cut
87
88 sub get_elasticsearch {
89     my $self = shift @_;
90     unless (defined $self->{elasticsearch}) {
91         my $conf = $self->get_elasticsearch_params();
92         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
93     }
94     return $self->{elasticsearch};
95 }
96
97 =head2 get_elasticsearch_params
98
99     my $params = $self->get_elasticsearch_params();
100
101 This provides a hashref that contains the parameters for connecting to the
102 ElasicSearch servers, in the form:
103
104     {
105         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
106         'index_name' => 'koha_instance_index',
107     }
108
109 This is configured by the following in the C<config> block in koha-conf.xml:
110
111     <elasticsearch>
112         <server>127.0.0.1:9200</server>
113         <server>anotherserver:9200</server>
114         <index_name>koha_instance</index_name>
115     </elasticsearch>
116
117 =cut
118
119 sub get_elasticsearch_params {
120     my ($self) = @_;
121
122     # Copy the hash so that we're not modifying the original
123     my $conf = C4::Context->config('elasticsearch');
124     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
125     my $es = { %{ $conf } };
126
127     # Helpfully, the multiple server lines end up in an array for us anyway
128     # if there are multiple ones, but not if there's only one.
129     my $server = $es->{server};
130     delete $es->{server};
131     if ( ref($server) eq 'ARRAY' ) {
132
133         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
134         $es->{nodes} = $server;
135     }
136     elsif ($server) {
137         $es->{nodes} = [$server];
138     }
139     else {
140         die "No elasticsearch servers were specified in koha-conf.xml.\n";
141     }
142     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
143       if ( !$es->{index_name} );
144     # Append the name of this particular index to our namespace
145     $es->{index_name} .= '_' . $self->index;
146
147     $es->{key_prefix} = 'es_';
148     $es->{client} //= '5_0::Direct';
149     $es->{cxn_pool} //= 'Static';
150     $es->{request_timeout} //= 60;
151
152     return $es;
153 }
154
155 =head2 get_elasticsearch_settings
156
157     my $settings = $self->get_elasticsearch_settings();
158
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
161
162 A hashref containing the settings is returned.
163
164 =cut
165
166 sub get_elasticsearch_settings {
167     my ($self) = @_;
168
169     # Use state to speed up repeated calls
170     state $settings = undef;
171     if (!defined $settings) {
172         my $config_file = C4::Context->config('elasticsearch_index_config');
173         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174         $settings = LoadFile( $config_file );
175     }
176
177     return $settings;
178 }
179
180 =head2 get_elasticsearch_mappings
181
182     my $mappings = $self->get_elasticsearch_mappings();
183
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
186
187 =cut
188
189 sub get_elasticsearch_mappings {
190     my ($self) = @_;
191
192     # Use state to speed up repeated calls
193     state %all_mappings;
194     state %sort_fields;
195
196     if (!defined $all_mappings{$self->index}) {
197         $sort_fields{$self->index} = {};
198         # Clone the general mapping to break ties with the original hash
199         my $mappings = {
200             data => clone(_get_elasticsearch_field_config('general', ''))
201         };
202         my $marcflavour = lc C4::Context->preference('marcflavour');
203         $self->_foreach_mapping(
204             sub {
205                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206                 return if $marc_type ne $marcflavour;
207                 # TODO if this gets any sort of complexity to it, it should
208                 # be broken out into its own function.
209
210                 # TODO be aware of date formats, but this requires pre-parsing
211                 # as ES will simply reject anything with an invalid date.
212                 my $es_type = 'text';
213                 if ($type eq 'boolean') {
214                     $es_type = 'boolean';
215                 } elsif ($type eq 'number' || $type eq 'sum') {
216                     $es_type = 'integer';
217                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218                     $es_type = 'stdno';
219                 }
220
221                 if ($search) {
222                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
223                 }
224
225                 if ($facet) {
226                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
227                 }
228                 if ($suggestible) {
229                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
230                 }
231                 # Sort is a bit special as it can be true, false, undef.
232                 # We care about "true" or "undef",
233                 # "undef" means to do the default thing, which is make it sortable.
234                 if (!defined $sort || $sort) {
235                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236                     $sort_fields{$self->index}{$name} = 1;
237                 }
238             }
239         );
240         $all_mappings{$self->index} = $mappings;
241     }
242     $self->sort_fields(\%{$sort_fields{$self->index}});
243
244     return $all_mappings{$self->index};
245 }
246
247 =head2 _get_elasticsearch_field_config
248
249 Get the Elasticsearch field config for the given purpose and data type.
250
251 $mapping = _get_elasticsearch_field_config('search', 'text');
252
253 =cut
254
255 sub _get_elasticsearch_field_config {
256
257     my ( $purpose, $type ) = @_;
258
259     # Use state to speed up repeated calls
260     state $settings = undef;
261     if (!defined $settings) {
262         my $config_file = C4::Context->config('elasticsearch_field_config');
263         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
264         $settings = LoadFile( $config_file );
265     }
266
267     if (!defined $settings->{$purpose}) {
268         die "Field purpose $purpose not defined in field config";
269     }
270     if ($type eq '') {
271         return $settings->{$purpose};
272     }
273     if (defined $settings->{$purpose}{$type}) {
274         return $settings->{$purpose}{$type};
275     }
276     if (defined $settings->{$purpose}{'default'}) {
277         return $settings->{$purpose}{'default'};
278     }
279     return;
280 }
281
282 =head2 _load_elasticsearch_mappings
283
284 Load Elasticsearch mappings in the format of mappings.yaml.
285
286 $indexes = _load_elasticsearch_mappings();
287
288 =cut
289
290 sub _load_elasticsearch_mappings {
291     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
292     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
293     return LoadFile( $mappings_yaml );
294 }
295
296 sub reset_elasticsearch_mappings {
297     my ( $self ) = @_;
298     my $indexes = $self->_load_elasticsearch_mappings();
299
300     Koha::SearchMarcMaps->delete;
301     Koha::SearchFields->delete;
302
303     while ( my ( $index_name, $fields ) = each %$indexes ) {
304         while ( my ( $field_name, $data ) = each %$fields ) {
305
306             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
307
308             # Set default values
309             $sf_params{staff_client} //= 1;
310             $sf_params{opac} //= 1;
311
312             $sf_params{name} = $field_name;
313
314             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
315
316             my $mappings = $data->{mappings};
317             for my $mapping ( @$mappings ) {
318                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
319                     index_name => $index_name,
320                     marc_type => $mapping->{marc_type},
321                     marc_field => $mapping->{marc_field}
322                 });
323                 $search_field->add_to_search_marc_maps($marc_field, {
324                     facet => $mapping->{facet} || 0,
325                     suggestible => $mapping->{suggestible} || 0,
326                     sort => $mapping->{sort},
327                     search => $mapping->{search} // 1
328                 });
329             }
330         }
331     }
332 }
333
334 # This overrides the accessor provided by Class::Accessor so that if
335 # sort_fields isn't set, then it'll generate it.
336 sub sort_fields {
337     my $self = shift;
338     if (@_) {
339         $self->_sort_fields_accessor(@_);
340         return;
341     }
342     my $val = $self->_sort_fields_accessor();
343     return $val if $val;
344
345     # This will populate the accessor as a side effect
346     $self->get_elasticsearch_mappings();
347     return $self->_sort_fields_accessor();
348 }
349
350 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
351
352     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
353
354 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
355 Since we group all mappings by MARC field targets C<$mappings> will contain
356 all targets for C<$data> and thus we need to fetch the MARC field only once.
357 C<$mappings> will be applied to C<$record_document> and new field values added.
358 The method has no return value.
359
360 =over 4
361
362 =item C<$mappings>
363
364 Arrayref of mappings containing arrayrefs in the format
365 [C<$target>, C<$options>] where C<$target> is the name of the target field and
366 C<$options> is a hashref containing processing directives for this particular
367 mapping.
368
369 =item C<$data>
370
371 The source data from a MARC record field.
372
373 =item C<$record_document>
374
375 Hashref representing the Elasticsearch document on which mappings should be
376 applied.
377
378 =item C<$altscript>
379
380 A boolean value indicating whether an alternate script presentation is being
381 processed.
382
383 =back
384
385 =cut
386
387 sub _process_mappings {
388     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
389     foreach my $mapping (@{$mappings}) {
390         my ($target, $options) = @{$mapping};
391
392         # Don't process sort fields for alternate scripts
393         my $sort = $target =~ /__sort$/;
394         if ($sort && $altscript) {
395             next;
396         }
397
398         # Copy (scalar) data since can have multiple targets
399         # with differing options for (possibly) mutating data
400         # so need a different copy for each
401         my $_data = $data;
402         $record_document->{$target} //= [];
403         if (defined $options->{substr}) {
404             my ($start, $length) = @{$options->{substr}};
405             $_data = length($data) > $start ? substr $data, $start, $length : '';
406         }
407         if (defined $options->{value_callbacks}) {
408             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
409         }
410         if (defined $options->{property}) {
411             $_data = {
412                 $options->{property} => $_data
413             }
414         }
415         push @{$record_document->{$target}}, $_data;
416     }
417 }
418
419 =head2 marc_records_to_documents($marc_records)
420
421     my $record_documents = $self->marc_records_to_documents($marc_records);
422
423 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
424
425 Returns array of hash references, representing Elasticsearch documents,
426 acceptable as body payload in C<Search::Elasticsearch> requests.
427
428 =over 4
429
430 =item C<$marc_documents>
431
432 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
433
434 =back
435
436 =cut
437
438 sub marc_records_to_documents {
439     my ($self, $records) = @_;
440     my $rules = $self->_get_marc_mapping_rules();
441     my $control_fields_rules = $rules->{control_fields};
442     my $data_fields_rules = $rules->{data_fields};
443     my $marcflavour = lc C4::Context->preference('marcflavour');
444     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
445
446     my @record_documents;
447
448     foreach my $record (@{$records}) {
449         my $record_document = {};
450         my $mappings = $rules->{leader};
451         if ($mappings) {
452             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
453         }
454         foreach my $field ($record->fields()) {
455             if ($field->is_control_field()) {
456                 my $mappings = $control_fields_rules->{$field->tag()};
457                 if ($mappings) {
458                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
459                 }
460             }
461             else {
462                 my $tag = $field->tag();
463                 # Handle alternate scripts in MARC 21
464                 my $altscript = 0;
465                 if ($marcflavour eq 'marc21' && $tag eq '880') {
466                     my $sub6 = $field->subfield('6');
467                     if ($sub6 =~ /^(...)-\d+/) {
468                         $tag = $1;
469                         $altscript = 1;
470                     }
471                 }
472
473                 my $data_field_rules = $data_fields_rules->{$tag};
474                 if ($data_field_rules) {
475                     my $subfields_mappings = $data_field_rules->{subfields};
476                     my $wildcard_mappings = $subfields_mappings->{'*'};
477                     foreach my $subfield ($field->subfields()) {
478                         my ($code, $data) = @{$subfield};
479                         my $mappings = $subfields_mappings->{$code} // [];
480                         if ($wildcard_mappings) {
481                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
482                         }
483                         if (@{$mappings}) {
484                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
485                         }
486                         if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
487                             # Used by the authority linker the match-heading field requires a specific syntax
488                             # that is specified in C4/Heading
489                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
490                             next unless $heading;
491                             push @{$record_document->{'match-heading'}}, $heading->search_form;
492                         }
493                     }
494
495                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
496                     if ($subfields_join_mappings) {
497                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
498                             # Map each subfield to values, remove empty values, join with space
499                             my $data = join(
500                                 ' ',
501                                 grep(
502                                     $_,
503                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
504                                 )
505                             );
506                             if ($data) {
507                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
508                             }
509                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
510                                 # Used by the authority linker the match-heading field requires a specific syntax
511                                 # that is specified in C4/Heading
512                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
513                                 next unless $heading;
514                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
515                             }
516                         }
517                     }
518                 }
519             }
520         }
521         foreach my $field (keys %{$rules->{defaults}}) {
522             unless (defined $record_document->{$field}) {
523                 $record_document->{$field} = $rules->{defaults}->{$field};
524             }
525         }
526         foreach my $field (@{$rules->{sum}}) {
527             if (defined $record_document->{$field}) {
528                 # TODO: validate numeric? filter?
529                 # TODO: Or should only accept fields without nested values?
530                 # TODO: Quick and dirty, improve if needed
531                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
532             }
533         }
534         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
535         foreach my $field (@{$rules->{isbn}}) {
536             if (defined $record_document->{$field}) {
537                 my @isbns = ();
538                 foreach my $input_isbn (@{$record_document->{$field}}) {
539                     my $isbn = Business::ISBN->new($input_isbn);
540                     if (defined $isbn && $isbn->is_valid) {
541                         my $isbn13 = $isbn->as_isbn13->as_string;
542                         push @isbns, $isbn13;
543                         $isbn13 =~ s/\-//g;
544                         push @isbns, $isbn13;
545
546                         my $isbn10 = $isbn->as_isbn10;
547                         if ($isbn10) {
548                             $isbn10 = $isbn10->as_string;
549                             push @isbns, $isbn10;
550                             $isbn10 =~ s/\-//g;
551                             push @isbns, $isbn10;
552                         }
553                     } else {
554                         push @isbns, $input_isbn;
555                     }
556                 }
557                 $record_document->{$field} = \@isbns;
558             }
559         }
560
561         # Remove duplicate values and collapse sort fields
562         foreach my $field (keys %{$record_document}) {
563             if (ref($record_document->{$field}) eq 'ARRAY') {
564                 @{$record_document->{$field}} = do {
565                     my %seen;
566                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
567                 };
568                 if ($field =~ /__sort$/) {
569                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
570                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
571                 }
572             }
573         }
574
575         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
576         $record->encoding('UTF-8');
577         if ($use_array) {
578             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
579             $record_document->{'marc_format'} = 'ARRAY';
580         } else {
581             my @warnings;
582             {
583                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
584                 local $SIG{__WARN__} = sub {
585                     push @warnings, $_[0];
586                 };
587                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
588             }
589             if (@warnings) {
590                 # Suppress warnings if record length exceeded
591                 unless (substr($record->leader(), 0, 5) eq '99999') {
592                     foreach my $warning (@warnings) {
593                         carp $warning;
594                     }
595                 }
596                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
597                 $record_document->{'marc_format'} = 'MARCXML';
598             }
599             else {
600                 $record_document->{'marc_format'} = 'base64ISO2709';
601             }
602         }
603         push @record_documents, $record_document;
604     }
605     return \@record_documents;
606 }
607
608 =head2 _marc_to_array($record)
609
610     my @fields = _marc_to_array($record)
611
612 Convert a MARC::Record to an array modeled after MARC-in-JSON
613 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
614
615 =over 4
616
617 =item C<$record>
618
619 A MARC::Record object
620
621 =back
622
623 =cut
624
625 sub _marc_to_array {
626     my ($self, $record) = @_;
627
628     my $data = {
629         leader => $record->leader(),
630         fields => []
631     };
632     for my $field ($record->fields()) {
633         my $tag = $field->tag();
634         if ($field->is_control_field()) {
635             push @{$data->{fields}}, {$tag => $field->data()};
636         } else {
637             my $subfields = ();
638             foreach my $subfield ($field->subfields()) {
639                 my ($code, $contents) = @{$subfield};
640                 push @{$subfields}, {$code => $contents};
641             }
642             push @{$data->{fields}}, {
643                 $tag => {
644                     ind1 => $field->indicator(1),
645                     ind2 => $field->indicator(2),
646                     subfields => $subfields
647                 }
648             };
649         }
650     }
651     return $data;
652 }
653
654 =head2 _array_to_marc($data)
655
656     my $record = _array_to_marc($data)
657
658 Convert an array modeled after MARC-in-JSON to a MARC::Record
659
660 =over 4
661
662 =item C<$data>
663
664 An array modeled after MARC-in-JSON
665 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
666
667 =back
668
669 =cut
670
671 sub _array_to_marc {
672     my ($self, $data) = @_;
673
674     my $record = MARC::Record->new();
675
676     $record->leader($data->{leader});
677     for my $field (@{$data->{fields}}) {
678         my $tag = (keys %{$field})[0];
679         $field = $field->{$tag};
680         my $marc_field;
681         if (ref($field) eq 'HASH') {
682             my @subfields;
683             foreach my $subfield (@{$field->{subfields}}) {
684                 my $code = (keys %{$subfield})[0];
685                 push @subfields, $code;
686                 push @subfields, $subfield->{$code};
687             }
688             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
689         } else {
690             $marc_field = MARC::Field->new($tag, $field)
691         }
692         $record->append_fields($marc_field);
693     }
694 ;
695     return $record;
696 }
697
698 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
699
700     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
701
702 Get mappings, an internal data structure later used by
703 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
704 data for a MARC mapping.
705
706 The returned C<$mappings> is not to to be confused with mappings provided by
707 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
708 provided by C<_foreach_mapping> and expands it to this internal data structure.
709 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
710 is then applied to each MARC target (leader, control field data, subfield or
711 joined subfields) and integrated into the mapping rules data structure used in
712 C<marc_records_to_documents> to transform MARC records into Elasticsearch
713 documents.
714
715 =over 4
716
717 =item C<$facet>
718
719 Boolean indicating whether to create a facet field for this mapping.
720
721 =item C<$suggestible>
722
723 Boolean indicating whether to create a suggestion field for this mapping.
724
725 =item C<$sort>
726
727 Boolean indicating whether to create a sort field for this mapping.
728
729 =item C<$search>
730
731 Boolean indicating whether to create a search field for this mapping.
732
733 =item C<$target_name>
734
735 Elasticsearch document target field name.
736
737 =item C<$target_type>
738
739 Elasticsearch document target field type.
740
741 =item C<$range>
742
743 An optional range as a string in the format "<START>-<END>" or "<START>",
744 where "<START>" and "<END>" are integers specifying a range that will be used
745 for extracting a substring from MARC data as Elasticsearch field target value.
746
747 The first character position is "0", and the range is inclusive,
748 so "0-2" means the first three characters of MARC data.
749
750 If only "<START>" is provided only one character at position "<START>" will
751 be extracted.
752
753 =back
754
755 =cut
756
757 sub _field_mappings {
758     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
759     my %mapping_defaults = ();
760     my @mappings;
761
762     my $substr_args = undef;
763     if (defined $range) {
764         # TODO: use value_callback instead?
765         my ($start, $end) = map(int, split /-/, $range, 2);
766         $substr_args = [$start];
767         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
768     }
769     my $default_options = {};
770     if ($substr_args) {
771         $default_options->{substr} = $substr_args;
772     }
773
774     # TODO: Should probably have per type value callback/hook
775     # but hard code for now
776     if ($target_type eq 'boolean') {
777         $default_options->{value_callbacks} //= [];
778         push @{$default_options->{value_callbacks}}, sub {
779             my ($value) = @_;
780             # Trim whitespace at both ends
781             $value =~ s/^\s+|\s+$//g;
782             return $value ? 'true' : 'false';
783         };
784     }
785
786     if ($search) {
787         my $mapping = [$target_name, $default_options];
788         push @mappings, $mapping;
789     }
790
791     my @suffixes = ();
792     push @suffixes, 'facet' if $facet;
793     push @suffixes, 'suggestion' if $suggestible;
794     push @suffixes, 'sort' if !defined $sort || $sort;
795
796     foreach my $suffix (@suffixes) {
797         my $mapping = ["${target_name}__$suffix"];
798         # TODO: Hack, fix later in less hideous manner
799         if ($suffix eq 'suggestion') {
800             push @{$mapping}, {%{$default_options}, property => 'input'};
801         }
802         else {
803             push @{$mapping}, $default_options;
804         }
805         push @mappings, $mapping;
806     }
807     return @mappings;
808 };
809
810 =head2 _get_marc_mapping_rules
811
812     my $mapping_rules = $self->_get_marc_mapping_rules()
813
814 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
815
816 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
817 each call to C<MARC::Record>->field) we create an optimized structure of mapping
818 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
819
820 We can then iterate through all MARC fields for each record and apply all relevant
821 rules once per fields instead of retreiving fields multiple times for each mapping rule
822 which is terribly slow.
823
824 =cut
825
826 # TODO: This structure can be used for processing multiple MARC::Records so is currently
827 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
828 # memory cache which it is currently not. The performance gain of caching
829 # would probably be marginal, but to do this could be a further improvement.
830
831 sub _get_marc_mapping_rules {
832     my ($self) = @_;
833     my $marcflavour = lc C4::Context->preference('marcflavour');
834     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
835     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
836     my $rules = {
837         'leader' => [],
838         'control_fields' => {},
839         'data_fields' => {},
840         'sum' => [],
841         'isbn' => [],
842         'defaults' => {}
843     };
844
845     $self->_foreach_mapping(sub {
846         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
847         return if $marc_type ne $marcflavour;
848
849         if ($type eq 'sum') {
850             push @{$rules->{sum}}, $name;
851             push @{$rules->{sum}}, $name."__sort" if $sort;
852         }
853         elsif ($type eq 'isbn') {
854             push @{$rules->{isbn}}, $name;
855         }
856         elsif ($type eq 'boolean') {
857             # boolean gets special handling, if value doesn't exist for a field,
858             # it is set to false
859             $rules->{defaults}->{$name} = 'false';
860         }
861
862         if ($marc_field =~ $field_spec_regexp) {
863             my $field_tag = $1;
864
865             my @subfields;
866             my @subfield_groups;
867             # Parse and separate subfields form subfield groups
868             if (defined $2) {
869                 my $subfield_group = '';
870                 my $open_group = 0;
871
872                 foreach my $token (split //, $2) {
873                     if ($token eq "(") {
874                         if ($open_group) {
875                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
876                                 "Unmatched opening parenthesis for $marc_field"
877                             );
878                         }
879                         else {
880                             $open_group = 1;
881                         }
882                     }
883                     elsif ($token eq ")") {
884                         if ($open_group) {
885                             if ($subfield_group) {
886                                 push @subfield_groups, $subfield_group;
887                                 $subfield_group = '';
888                             }
889                             $open_group = 0;
890                         }
891                         else {
892                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
893                                 "Unmatched closing parenthesis for $marc_field"
894                             );
895                         }
896                     }
897                     elsif ($open_group) {
898                         $subfield_group .= $token;
899                     }
900                     else {
901                         push @subfields, $token;
902                     }
903                 }
904             }
905             else {
906                 push @subfields, '*';
907             }
908
909             my $range = defined $3 ? $3 : undef;
910             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
911             if ($field_tag < 10) {
912                 $rules->{control_fields}->{$field_tag} //= [];
913                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
914             }
915             else {
916                 $rules->{data_fields}->{$field_tag} //= {};
917                 foreach my $subfield (@subfields) {
918                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
919                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
920                 }
921                 foreach my $subfield_group (@subfield_groups) {
922                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
923                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
924                 }
925             }
926         }
927         elsif ($marc_field =~ $leader_regexp) {
928             my $range = defined $1 ? $1 : undef;
929             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
930             push @{$rules->{leader}}, @mappings;
931         }
932         else {
933             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
934                 "Invalid MARC field expression: $marc_field"
935             );
936         }
937     });
938     return $rules;
939 }
940
941 =head2 _foreach_mapping
942
943     $self->_foreach_mapping(
944         sub {
945             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
946                 $marc_field )
947               = @_;
948             return unless $marc_type eq 'marc21';
949             print "Data comes from: " . $marc_field . "\n";
950         }
951     );
952
953 This allows you to apply a function to each entry in the elasticsearch mappings
954 table, in order to build the mappings for whatever is needed.
955
956 In the provided function, the files are:
957
958 =over 4
959
960 =item C<$name>
961
962 The field name for elasticsearch (corresponds to the 'mapping' column in the
963 database.
964
965 =item C<$type>
966
967 The type for this value, e.g. 'string'.
968
969 =item C<$facet>
970
971 True if this value should be facetised. This only really makes sense if the
972 field is understood by the facet processing code anyway.
973
974 =item C<$sort>
975
976 True if this is a field that a) needs special sort handling, and b) if it
977 should be sorted on. False if a) but not b). Undef if not a). This allows,
978 for example, author to be sorted on but not everything marked with "author"
979 to be included in that sort.
980
981 =item C<$marc_type>
982
983 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
984 'unimarc', 'normarc'.
985
986 =item C<$marc_field>
987
988 A string that describes the MARC field that contains the data to extract.
989 These are of a form suited to Catmandu's MARC fixers.
990
991 =back
992
993 =cut
994
995 sub _foreach_mapping {
996     my ( $self, $sub ) = @_;
997
998     # TODO use a caching framework here
999     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1000         {
1001             'search_marc_map.index_name' => $self->index,
1002         },
1003         {   join => { search_marc_to_fields => 'search_marc_map' },
1004             '+select' => [
1005                 'search_marc_to_fields.facet',
1006                 'search_marc_to_fields.suggestible',
1007                 'search_marc_to_fields.sort',
1008                 'search_marc_to_fields.search',
1009                 'search_marc_map.marc_type',
1010                 'search_marc_map.marc_field',
1011             ],
1012             '+as'     => [
1013                 'facet',
1014                 'suggestible',
1015                 'sort',
1016                 'search',
1017                 'marc_type',
1018                 'marc_field',
1019             ],
1020         }
1021     );
1022
1023     while ( my $search_field = $search_fields->next ) {
1024         $sub->(
1025             # Force lower case on indexed field names for case insensitive
1026             # field name searches
1027             lc($search_field->name),
1028             $search_field->type,
1029             $search_field->get_column('facet'),
1030             $search_field->get_column('suggestible'),
1031             $search_field->get_column('sort'),
1032             $search_field->get_column('search'),
1033             $search_field->get_column('marc_type'),
1034             $search_field->get_column('marc_field'),
1035         );
1036     }
1037 }
1038
1039 =head2 process_error
1040
1041     die process_error($@);
1042
1043 This parses an Elasticsearch error message and produces a human-readable
1044 result from it. This result is probably missing all the useful information
1045 that you might want in diagnosing an issue, so the warning is also logged.
1046
1047 Note that currently the resulting message is not internationalised. This
1048 will happen eventually by some method or other.
1049
1050 =cut
1051
1052 sub process_error {
1053     my ($self, $msg) = @_;
1054
1055     warn $msg; # simple logging
1056
1057     # This is super-primitive
1058     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1059
1060     return "Unable to perform your search. Please try again.\n";
1061 }
1062
1063 =head2 _read_configuration
1064
1065     my $conf = _read_configuration();
1066
1067 Reads the I<configuration file> and returns a hash structure with the
1068 configuration information. It raises an exception if mandatory entries
1069 are missing.
1070
1071 The hashref structure has the following form:
1072
1073     {
1074         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1075         'index_name' => 'koha_instance',
1076     }
1077
1078 This is configured by the following in the C<config> block in koha-conf.xml:
1079
1080     <elasticsearch>
1081         <server>127.0.0.1:9200</server>
1082         <server>anotherserver:9200</server>
1083         <index_name>koha_instance</index_name>
1084     </elasticsearch>
1085
1086 =cut
1087
1088 sub _read_configuration {
1089
1090     my $configuration;
1091
1092     my $conf = C4::Context->config('elasticsearch');
1093     Koha::Exceptions::Config::MissingEntry->throw(
1094         "Missing 'elasticsearch' block in config file")
1095       unless defined $conf;
1096
1097     if ( $conf && $conf->{server} ) {
1098         my $nodes = $conf->{server};
1099         if ( ref($nodes) eq 'ARRAY' ) {
1100             $configuration->{nodes} = $nodes;
1101         }
1102         else {
1103             $configuration->{nodes} = [$nodes];
1104         }
1105     }
1106     else {
1107         Koha::Exceptions::Config::MissingEntry->throw(
1108             "Missing 'server' entry in config file for elasticsearch");
1109     }
1110
1111     if ( defined $conf->{index_name} ) {
1112         $configuration->{index_name} = $conf->{index_name};
1113     }
1114     else {
1115         Koha::Exceptions::Config::MissingEntry->throw(
1116             "Missing 'index_name' entry in config file for elasticsearch");
1117     }
1118
1119     return $configuration;
1120 }
1121
1122 =head2 get_facetable_fields
1123
1124 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1125
1126 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1127
1128 =cut
1129
1130 sub get_facetable_fields {
1131     my ($self) = @_;
1132
1133     # These should correspond to the ES field names, as opposed to the CCL
1134     # things that zebra uses.
1135     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1136     my @faceted_fields = Koha::SearchFields->search(
1137         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1138     );
1139     my @not_faceted_fields = Koha::SearchFields->search(
1140         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1141     );
1142     # This could certainly be improved
1143     return ( @faceted_fields, @not_faceted_fields );
1144 }
1145
1146 1;
1147
1148 __END__
1149
1150 =head1 AUTHOR
1151
1152 =over 4
1153
1154 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1155
1156 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1157
1158 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1159
1160 =back
1161
1162 =cut