Bug 23137: Add reset option to rebuild_elasticsearch.pl
[koha.git] / misc / search_tools / rebuild_elasticsearch.pl
1 #!/usr/bin/perl
2
3 # This inserts records from a Koha database into elastic search
4
5 # Copyright 2014 Catalyst IT
6 #
7 # This file is part of Koha.
8 #
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
21
22 =head1 NAME
23
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
25
26 =head1 SYNOPSIS
27
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
30 [B<-d|--delete>]
31 [B<-r|--reset>]
32 [B<-a|--authorities>]
33 [B<-b|--biblios>]
34 [B<-bn|--bnumber>]
35 [B<-ai|--authid>]
36 [B<-p|--processes>]
37 [B<-v|--verbose>]
38 [B<-h|--help>]
39 [B<--man>]
40
41 =head1 DESCRIPTION
42
43 Inserts records from a Koha database into Elasticsearch.
44
45 =head1 OPTIONS
46
47 =over
48
49 =item B<-c|--commit>=C<count>
50
51 Specify how many records will be batched up before they're added to Elasticsearch.
52 Higher should be faster, but will cause more RAM usage. Default is 5000.
53
54 =item B<-d|--delete>
55
56 Delete the index and recreate it before indexing.
57
58 =item B<-r|--reset>
59
60 Reload mappings from files (specified in koha-conf.xml) before indexing.
61 Implies --delete.
62
63 =item B<-a|--authorities>
64
65 Index the authorities only. Combining this with B<-b> is the same as
66 specifying neither and so both get indexed.
67
68 =item B<-b|--biblios>
69
70 Index the biblios only. Combining this with B<-a> is the same as
71 specifying neither and so both get indexed.
72
73 =item B<-bn|--bnumber>
74
75 Only index the supplied biblionumber, mostly for testing purposes. May be
76 repeated.
77
78 =item B<-ai|--authid>
79
80 Only index the supplied authority id, mostly for testing purposes. May be
81 repeated.
82
83 =item B<-p|--processes>
84
85 Number of processes to use for indexing. This can be used to do more indexing
86 work in parallel on multicore systems. By default, a single process is used.
87
88 =item B<-v|--verbose>
89
90 By default, this program only emits warnings and errors. This makes it talk
91 more. Add more to make it even more wordy, in particular when debugging.
92
93 =item B<-h|--help>
94
95 Help!
96
97 =item B<--man>
98
99 Full documentation.
100
101 =back
102
103 =head1 IMPLEMENTATION
104
105 =cut
106
107 use autodie;
108 use Getopt::Long;
109 use Koha::Script;
110 use C4::Context;
111 use Koha::MetadataRecord::Authority;
112 use Koha::BiblioUtils;
113 use Koha::SearchEngine::Elasticsearch;
114 use Koha::SearchEngine::Elasticsearch::Indexer;
115 use Koha::Caches;
116 use MARC::Field;
117 use MARC::Record;
118 use Modern::Perl;
119 use Pod::Usage;
120
121 my $verbose = 0;
122 my $commit = 5000;
123 my ($delete, $reset, $help, $man, $processes);
124 my ($index_biblios, $index_authorities);
125 my (@biblionumbers,@authids);
126
127 $|=1; # flushes output
128
129 GetOptions(
130     'c|commit=i'    => \$commit,
131     'd|delete'      => \$delete,
132     'r|reset'       => \$reset,
133     'a|authorities' => \$index_authorities,
134     'b|biblios'     => \$index_biblios,
135     'bn|bnumber=i' => \@biblionumbers,
136     'ai|authid=i'  => \@authids,
137     'p|processes=i' => \$processes,
138     'v|verbose+'    => \$verbose,
139     'h|help'        => \$help,
140     'man'           => \$man,
141 );
142
143 # Default is to do both
144 unless ($index_authorities || $index_biblios) {
145     $index_authorities = $index_biblios = 1;
146 }
147
148 if ($processes && ( @biblionumbers || @authids) ) {
149     die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
150 }
151
152 pod2usage(1) if $help;
153 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
154
155 _sanity_check();
156
157 if ($reset){
158     Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
159     my $cache = Koha::Caches->get_instance();
160     $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
161     $cache->clear_from_cache('elasticsearch_search_fields_opac');
162     $delete = 1;
163 }
164
165 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
166 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
167
168 my $slice_index = 0;
169 my $slice_count = ( $processes //= 1 );
170 my %iterator_options;
171
172 if ($slice_count > 1) {
173     # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
174     $slice_index = 0;
175     for (my $proc = 1; $proc < $slice_count; $proc++) {
176         my $pid = fork();
177         die "Failed to fork a child process\n" unless defined $pid;
178         if ($pid == 0) {
179             # Child process, give it a slice to process
180             $slice_index = $proc;
181             last;
182         }
183     }
184     # Fudge the commit count a bit to spread out the Elasticsearch commits
185     $commit *= 1 + 0.10 * $slice_index;
186     _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
187     $iterator_options{slice} = { index => $slice_index, count => $slice_count };
188 }
189
190 my $next;
191 if ($index_biblios) {
192     _log(1, "Indexing biblios\n");
193     if (@biblionumbers) {
194         $next = sub {
195             my $r = shift @biblionumbers;
196             return () unless defined $r;
197             return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
198         };
199     } else {
200         my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
201         $next = sub {
202             $records->next();
203         }
204     }
205     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
206 }
207 if ($index_authorities) {
208     _log(1, "Indexing authorities\n");
209     if (@authids) {
210         $next = sub {
211             my $r = shift @authids;
212             return () unless defined $r;
213             my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
214             return ($r, $a);
215         };
216     } else {
217         my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
218         $next = sub {
219             $records->next();
220         }
221     }
222     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
223 }
224
225 if ($slice_index == 0) {
226     # Main process, wait for children
227     for (my $proc = 1; $proc < $processes; $proc++) {
228         wait();
229     }
230 }
231
232 =head1 INTERNAL METHODS
233
234 =head2 _verify_index_state
235
236     _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
237
238 Checks the index state and recreates it if requested.
239
240 =cut
241
242 sub _verify_index_state {
243     my ( $index_name, $recreate ) = @_;
244
245     _log(1, "Checking state of $index_name index\n");
246     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
247
248     if ($recreate) {
249         _log(1, "Dropping and recreating $index_name index\n");
250         $indexer->drop_index() if $indexer->index_exists();
251         $indexer->create_index();
252     }
253     elsif (!$indexer->index_exists) {
254         # Create index if does not exist
255         $indexer->create_index();
256     } elsif ($indexer->is_index_status_ok) {
257         # Update mapping unless index is some kind of problematic state
258         $indexer->update_mappings();
259     } elsif ($indexer->is_index_status_recreate_required) {
260         warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
261     }
262 }
263
264 =head2 _do_reindex
265
266     _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
267
268 Does the actual reindexing. $callback is a function that always returns the next record.
269 For each index we iterate through the records, committing at specified count
270
271 =cut
272
273 sub _do_reindex {
274     my ( $next, $index_name ) = @_;
275
276     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
277
278     my $count        = 0;
279     my $commit_count = $commit;
280     my ( @id_buffer, @commit_buffer );
281     while ( my $record = $next->() ) {
282         my $id     = $record->id // $record->authid;
283         my $record = $record->record;
284         $count++;
285         if ( $verbose == 1 ) {
286             _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
287         } else {
288             _log( 2, "$id\n" );
289         }
290
291         push @id_buffer,     $id;
292         push @commit_buffer, $record;
293         if ( !( --$commit_count ) ) {
294             _log( 1, "Committing $commit records...\n" );
295             my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
296             _handle_response($response);
297             $commit_count  = $commit;
298             @id_buffer     = ();
299             @commit_buffer = ();
300             _log( 1, "Commit complete\n" );
301         }
302     }
303
304     # There are probably uncommitted records
305     _log( 1, "Committing final records...\n" );
306     my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
307     _handle_response($response);
308     _log( 1, "Total $count records indexed\n" );
309 }
310
311 =head2 _sanity_check
312
313     _sanity_check();
314
315 Checks some basic stuff to ensure that it's sane before we start.
316
317 =cut
318
319 sub _sanity_check {
320     # Do we have an elasticsearch block defined?
321     my $conf = C4::Context->config('elasticsearch');
322     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
323 }
324
325 =head2 _handle_response
326
327 Parse the return from update_index and display errors depending on verbosity of the script
328
329 =cut
330
331 sub _handle_response {
332     my ($response) = @_;
333     if( $response->{errors} eq 'true' ){
334         _log( 1, "There were errors during indexing\n" );
335         if ( $verbose > 1 ){
336             foreach my $item (@{$response->{items}}){
337                 next unless defined $item->{index}->{error};
338                 print "Record #" . $item->{index}->{_id} . " " .
339                       $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
340                       $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
341             }
342         }
343     }
344 }
345
346 =head2 _log
347
348     _log($level, "Message\n");
349
350 Output progress information.
351
352 Will output the message if verbosity level is set to $level or more. Will not
353 include a trailing newline automatically.
354
355 =cut
356
357 sub _log {
358     my ($level, $msg) = @_;
359
360     print "[$$] $msg" if ($verbose >= $level);
361 }