Bug 21908: Add DISTINCT biblionumber to rebuild_zebra.pl
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use Koha::Caches;
30 use XML::LibXML;
31
32 use constant LOCK_FILENAME => 'rebuild..LCK';
33
34 # script that checks zebradir structure & create directories & mandatory files if needed
35 #
36 #
37
38 $|=1; # flushes output
39 # If the cron job starts us in an unreadable dir, we will break without
40 # this.
41 chdir $ENV{HOME} if (!(-r '.'));
42 my $daemon_mode;
43 my $daemon_sleep = 5;
44 my $directory;
45 my $nosanitize;
46 my $skip_export;
47 my $keep_export;
48 my $skip_index;
49 my $reset;
50 my $biblios;
51 my $authorities;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66 my $is_memcached = Koha::Caches->get_instance->memcached_cache;
67
68 my $verbose_logging = 0;
69 my $zebraidx_log_opt = " -v none,fatal,warn ";
70 my $result = GetOptions(
71     'daemon'        => \$daemon_mode,
72     'sleep:i'       => \$daemon_sleep,
73     'd:s'           => \$directory,
74     'r|reset'       => \$reset,
75     's'             => \$skip_export,
76     'k'             => \$keep_export,
77     'I|skip-index'  => \$skip_index,
78     'nosanitize'    => \$nosanitize,
79     'b'             => \$biblios,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated\n";
103     undef $as_xml; # Should not be used later
104 }
105
106 if( not defined $run_as_root and $run_user eq 'root') {
107     my $msg = "Warning: You are running this script as the user 'root'.\n";
108     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
109     $msg   .= "Please do '$0 --help' to see usage.\n";
110     die $msg;
111 }
112
113 if ($process_zebraqueue and ($skip_export or $reset)) {
114     my $msg = "Cannot specify -r or -s if -z is specified\n";
115     $msg   .= "Please do '$0 --help' to see usage.\n";
116     die $msg;
117 }
118
119 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
120     my $msg = "Cannot specify both -y and -z\n";
121     $msg   .= "Please do '$0 --help' to see usage.\n";
122     die $msg;
123 }
124
125 if ($daemon_mode) {
126     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
127     if ($skip_export or $keep_export or $skip_index or
128           $where or $length or $offset) {
129         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
130         $msg   .= "Please do '$0 --help' to see usage.\n";
131         die $msg;
132     }
133     unless ($is_memcached) {
134         warn "Warning: script running in daemon mode, without recommended caching system (memcached).\n";
135     }
136     $authorities = 1;
137     $biblios = 1;
138     $process_zebraqueue = 1;
139 }
140
141 if (not $biblios and not $authorities) {
142     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
143     $msg   .= "Please do '$0 --help' to see usage.\n";
144     die $msg;
145 }
146
147 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio', 'biblio_metadata' );
148 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
149     die "Cannot specify -t|--table with value '$table'. Only "
150       . ( join ', ', @tables_allowed_for_select )
151       . " are allowed.";
152 }
153
154
155 #  -v is for verbose, which seems backwards here because of how logging is set
156 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
157 if ($verbose_logging >= 2) {
158     $zebraidx_log_opt = '-v none,fatal,warn,all';
159 }
160
161 my $use_tempdir = 0;
162 unless ($directory) {
163     $use_tempdir = 1;
164     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
165 }
166
167
168 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
169 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
170
171 my $kohadir = C4::Context->config('intranetdir');
172
173 my ($biblionumbertagfield,$biblionumbertagsubfield) = C4::Biblio::GetMarcFromKohaField("biblio.biblionumber","");
174 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = C4::Biblio::GetMarcFromKohaField("biblioitems.biblioitemnumber","");
175
176 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
177 <collection xmlns="http://www.loc.gov/MARC21/slim">
178 };
179
180 my $marcxml_close = q{
181 </collection>
182 };
183
184 # Protect again simultaneous update of the zebra index by using a lock file.
185 # Create our own lock directory if it is missing. This should be created
186 # by koha-zebra-ctl.sh or at system installation. If the desired directory
187 # does not exist and cannot be created, we fall back on /tmp - which will
188 # always work.
189
190 my ($lockfile, $LockFH);
191 foreach (
192     C4::Context->config("zebra_lockdir"),
193     '/var/lock/zebra_' . C4::Context->config('database'),
194     '/tmp/zebra_' . C4::Context->config('database')
195 ) {
196     #we try three possibilities (we really want to lock :)
197     next if !$_;
198     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
199     last if defined $LockFH;
200 }
201 if( !defined $LockFH ) {
202     print "WARNING: Could not create lock file $lockfile: $!\n";
203     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
204     print "Verify file permissions for it too.\n";
205     $use_flock = 0; # we disable file locking now and will continue
206                     # without it
207                     # note that this mimics old behavior (before we used
208                     # the lockfile)
209 };
210
211 if ( $verbose_logging ) {
212     print "Zebra configuration information\n";
213     print "================================\n";
214     print "Zebra biblio directory      = $biblioserverdir\n";
215     print "Zebra authorities directory = $authorityserverdir\n";
216     print "Koha directory              = $kohadir\n";
217     print "Lockfile                    = $lockfile\n" if $lockfile;
218     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
219     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
220     print "================================\n";
221 }
222
223 my $tester = XML::LibXML->new();
224 my $dbh;
225
226 # The main work is done here by calling do_one_pass().  We have added locking
227 # avoid race conditions between full rebuilds and incremental updates either from
228 # daemon mode or periodic invocation from cron.  The race can lead to an updated
229 # record being overwritten by a rebuild if the update is applied after the export
230 # by the rebuild and before the rebuild finishes (more likely to affect large
231 # catalogs).
232 #
233 # We have chosen to exit immediately by default if we cannot obtain the lock
234 # to prevent the potential for a infinite backlog from cron invocations, but an
235 # option (wait-for-lock) is provided to let the program wait for the lock.
236 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
237 if ($daemon_mode) {
238     while (1) {
239         # For incremental updates, skip the update if the updates are locked
240         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
241             eval {
242                 $dbh = C4::Context->dbh;
243                 if( zebraqueue_not_empty() ) {
244                     Koha::Caches->flush_L1_caches() if $is_memcached;
245                     do_one_pass();
246                 }
247             };
248             if ($@ && $verbose_logging) {
249                 warn "Warning : $@\n";
250             }
251             _flock($LockFH, LOCK_UN);
252         }
253         sleep $daemon_sleep;
254     }
255 } else {
256     # all one-off invocations
257     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
258     if (_flock($LockFH, $lock_mode)) {
259         $dbh = C4::Context->dbh;
260         do_one_pass();
261         _flock($LockFH, LOCK_UN);
262     } else {
263         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
264     }
265 }
266
267
268 if ( $verbose_logging ) {
269     print "====================\n";
270     print "CLEANING\n";
271     print "====================\n";
272 }
273 if ($keep_export) {
274     print "NOTHING cleaned : the export $directory has been kept.\n";
275     print "You can re-run this script with the -s ";
276     if ($use_tempdir) {
277         print " and -d $directory parameters";
278     } else {
279         print "parameter";
280     }
281     print "\n";
282     print "if you just want to rebuild zebra after changing zebra config files\n";
283 } else {
284     unless ($use_tempdir) {
285         # if we're using a temporary directory
286         # created by File::Temp, it will be removed
287         # automatically.
288         rmtree($directory, 0, 1);
289         print "directory $directory deleted\n";
290     }
291 }
292
293 sub do_one_pass {
294     if ($authorities) {
295         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
296     } else {
297         print "skipping authorities\n" if ( $verbose_logging );
298     }
299
300     if ($biblios) {
301         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
302     } else {
303         print "skipping biblios\n" if ( $verbose_logging );
304     }
305 }
306
307 # Check the zebra update queue and return true if there are records to process
308 # This routine will handle each of -ab, -a, or -b, but in practice we force
309 # -ab when in daemon mode.
310 sub zebraqueue_not_empty {
311     my $where_str;
312
313     if ($authorities && $biblios) {
314         $where_str = 'done = 0;';
315     } elsif ($biblios) {
316         $where_str = 'server = "biblioserver" AND done = 0;';
317     } else {
318         $where_str = 'server = "authorityserver" AND done = 0;';
319     }
320     my $query =
321         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
322
323     $query->execute;
324     my $count = $query->fetchrow_arrayref->[0];
325     print "queued records: $count\n" if $verbose_logging > 0;
326     return $count > 0;
327 }
328
329 # This checks to see if the zebra directories exist under the provided path.
330 # If they don't, then zebra is likely to spit the dummy. This returns true
331 # if the directories had to be created, false otherwise.
332 sub check_zebra_dirs {
333     my ($base) = shift() . '/';
334     my $needed_repairing = 0;
335     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
336     foreach my $dir (@dirs) {
337         my $bdir = $base . $dir;
338         if (! -d $bdir) {
339             $needed_repairing = 1;
340             mkdir $bdir || die "Unable to create '$bdir': $!\n";
341             print "$0: needed to create '$bdir'\n";
342         }
343     }
344     return $needed_repairing;
345 }   # ----------  end of subroutine check_zebra_dirs  ----------
346
347 sub index_records {
348     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
349
350     my $num_records_exported = 0;
351     my $records_deleted = {};
352     my $need_reset = check_zebra_dirs($server_dir);
353     if ($need_reset) {
354         print "$0: found broken zebra server directories: forcing a rebuild\n";
355         $reset = 1;
356     }
357     if ($skip_export && $verbose_logging) {
358         print "====================\n";
359         print "SKIPPING $record_type export\n";
360         print "====================\n";
361     } else {
362         if ( $verbose_logging ) {
363             print "====================\n";
364             print "exporting $record_type\n";
365             print "====================\n";
366         }
367         mkdir "$directory" unless (-d $directory);
368         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
369         if ($process_zebraqueue) {
370             my $entries;
371
372             unless ( $process_zebraqueue_skip_deletes ) {
373                 $entries = select_zebraqueue_records($record_type, 'deleted');
374                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
375                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
376                 mark_zebraqueue_batch_done($entries);
377             }
378
379             $entries = select_zebraqueue_records($record_type, 'updated');
380             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
381             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
382             mark_zebraqueue_batch_done($entries);
383
384         } else {
385             my $sth = select_all_records($record_type);
386             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
387             unless ($do_not_clear_zebraqueue) {
388                 mark_all_zebraqueue_done($record_type);
389             }
390         }
391     }
392
393     #
394     # and reindexing everything
395     #
396     if ($skip_index) {
397         if ($verbose_logging) {
398             print "====================\n";
399             print "SKIPPING $record_type indexing\n";
400             print "====================\n";
401         }
402     } else {
403         if ( $verbose_logging ) {
404             print "====================\n";
405             print "REINDEXING zebra\n";
406             print "====================\n";
407         }
408         my $record_fmt = 'marcxml';
409         if ($process_zebraqueue) {
410             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
411                 if %$records_deleted;
412             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
413                 if $num_records_exported;
414         } else {
415             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
416                 if ($num_records_exported or $skip_export);
417         }
418     }
419 }
420
421
422 sub select_zebraqueue_records {
423     my ($record_type, $update_type) = @_;
424
425     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
426     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
427
428     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
429                              FROM zebraqueue
430                              WHERE server = ?
431                              AND   operation = ?
432                              AND   done = 0
433                              ORDER BY id DESC");
434     $sth->execute($server, $op);
435     my $entries = $sth->fetchall_arrayref({});
436 }
437
438 sub mark_all_zebraqueue_done {
439     my ($record_type) = @_;
440
441     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
442
443     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
444                              WHERE server = ?
445                              AND done = 0");
446     $sth->execute($server);
447 }
448
449 sub mark_zebraqueue_batch_done {
450     my ($entries) = @_;
451
452     $dbh->{AutoCommit} = 0;
453     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
454     $dbh->commit();
455     foreach my $id (map { $_->{id} } @$entries) {
456         $sth->execute($id);
457     }
458     $dbh->{AutoCommit} = 1;
459 }
460
461 sub select_all_records {
462     my $record_type = shift;
463     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
464 }
465
466 sub select_all_authorities {
467     my $strsth=qq{SELECT authid FROM auth_header};
468     $strsth.=qq{ WHERE $where } if ($where);
469     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
470     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
471     my $sth = $dbh->prepare($strsth);
472     $sth->execute();
473     return $sth;
474 }
475
476 sub select_all_biblios {
477     $table = 'biblioitems'
478       unless grep { /^$table$/ } @tables_allowed_for_select;
479     my $strsth = qq{ SELECT DISTINCT(biblionumber) FROM $table };
480     $strsth.=qq{ WHERE $where } if ($where);
481     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
482     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
483     my $sth = $dbh->prepare($strsth);
484     $sth->execute();
485     return $sth;
486 }
487
488 sub export_marc_records_from_sth {
489     my ($record_type, $sth, $directory, $nosanitize) = @_;
490
491     my $num_exported = 0;
492     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
493
494     print {$fh} $marcxml_open;
495
496     my $i = 0;
497     my ( $itemtag, $itemsubfield ) = C4::Biblio::GetMarcFromKohaField("items.itemnumber",'');
498     while (my ($record_number) = $sth->fetchrow_array) {
499         print "." if ( $verbose_logging );
500         print "\r$i" unless ($i++ %100 or !$verbose_logging);
501         if ( $nosanitize ) {
502             my $marcxml = $record_type eq 'biblio'
503                           ? GetXmlBiblio( $record_number )
504                           : GetAuthorityXML( $record_number );
505             if ($record_type eq 'biblio'){
506                 my @items = GetItemsInfo($record_number);
507                 if (@items){
508                     my $record = MARC::Record->new;
509                     $record->encoding('UTF-8');
510                     my @itemsrecord;
511                     foreach my $item (@items){
512                         my $record = Item2Marc($item, $record_number);
513                         push @itemsrecord, $record->field($itemtag);
514                     }
515                     $record->insert_fields_ordered(@itemsrecord);
516                     my $itemsxml = $record->as_xml_record();
517                     $marcxml =
518                         substr($marcxml, 0, length($marcxml)-10) .
519                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
520                 }
521             }
522             # extra test to ensure that result is valid XML; otherwise
523             # Zebra won't parse it in DOM mode
524             eval {
525                 my $doc = $tester->parse_string($marcxml);
526             };
527             if ($@) {
528                 warn "Error exporting record $record_number ($record_type): $@\n";
529                 next;
530             }
531             if ( $marcxml ) {
532                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
533                 print {$fh} $marcxml;
534                 $num_exported++;
535             }
536             next;
537         }
538         my ($marc) = get_corrected_marc_record($record_type, $record_number);
539         if (defined $marc) {
540             eval {
541                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
542                 eval {
543                     my $doc = $tester->parse_string($rec);
544                 };
545                 if ($@) {
546                     die "invalid XML: $@";
547                 }
548                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
549                 print {$fh} $rec;
550                 $num_exported++;
551             };
552             if ($@) {
553                 warn "Error exporting record $record_number ($record_type) XML";
554                 warn "... specific error is $@" if $verbose_logging;
555             }
556         }
557     }
558     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
559     print {$fh} $marcxml_close;
560
561     close $fh;
562     return $num_exported;
563 }
564
565 sub export_marc_records_from_list {
566     my ($record_type, $entries, $directory, $records_deleted) = @_;
567
568     my $num_exported = 0;
569     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
570
571     print {$fh} $marcxml_open;
572
573     my $i = 0;
574
575     # Skip any deleted records. We check for this anyway, but this reduces error spam
576     my %found = %$records_deleted;
577     foreach my $record_number ( map { $_->{biblio_auth_number} }
578                                 grep { !$found{ $_->{biblio_auth_number} }++ }
579                                 @$entries ) {
580         print "." if ( $verbose_logging );
581         print "\r$i" unless ($i++ %100 or !$verbose_logging);
582         my ($marc) = get_corrected_marc_record($record_type, $record_number);
583         if (defined $marc) {
584             eval {
585                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
586                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
587                 print {$fh} $rec;
588                 $num_exported++;
589             };
590             if ($@) {
591               warn "Error exporting record $record_number ($record_type) XML";
592             }
593         }
594     }
595     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
596
597     print {$fh} $marcxml_close;
598
599     close $fh;
600     return $num_exported;
601 }
602
603 sub generate_deleted_marc_records {
604
605     my ($record_type, $entries, $directory) = @_;
606
607     my $records_deleted = {};
608     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
609
610     print {$fh} $marcxml_open;
611
612     my $i = 0;
613     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
614         print "\r$i" unless ($i++ %100 or !$verbose_logging);
615         print "." if ( $verbose_logging );
616
617         my $marc = MARC::Record->new();
618         if ($record_type eq 'biblio') {
619             fix_biblio_ids($marc, $record_number, $record_number);
620         } else {
621             fix_authority_id($marc, $record_number);
622         }
623         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
624             fix_unimarc_100($marc);
625         }
626
627         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
628         # Remove the record's XML header
629         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
630         print {$fh} $rec;
631
632         $records_deleted->{$record_number} = 1;
633     }
634     print "\nRecords exported: $i\n" if ( $verbose_logging );
635
636     print {$fh} $marcxml_close;
637
638     close $fh;
639     return $records_deleted;
640 }
641
642 sub get_corrected_marc_record {
643     my ( $record_type, $record_number ) = @_;
644
645     my $marc = get_raw_marc_record( $record_type, $record_number );
646
647     if ( defined $marc ) {
648         fix_leader($marc);
649         if ( $record_type eq 'authority' ) {
650             fix_authority_id( $marc, $record_number );
651         }
652         elsif ( $record_type eq 'biblio' ) {
653
654             my @filters;
655             push @filters, 'EmbedItemsAvailability';
656             push @filters, 'EmbedSeeFromHeadings'
657                 if C4::Context->preference('IncludeSeeFromInSearches');
658
659             my $normalizer = Koha::RecordProcessor->new( { filters => \@filters } );
660             $marc = $normalizer->process($marc);
661         }
662         if ( C4::Context->preference("marcflavour") eq "UNIMARC" ) {
663             fix_unimarc_100($marc);
664         }
665     }
666
667     return $marc;
668 }
669
670 sub get_raw_marc_record {
671     my ($record_type, $record_number) = @_;
672
673     my $marc;
674     if ($record_type eq 'biblio') {
675         eval { $marc = C4::Biblio::GetMarcBiblio({ biblionumber => $record_number, embed_items => 1 }); };
676         if ($@ || !$marc) {
677             # here we do warn since catching an exception
678             # means that the bib was found but failed
679             # to be parsed
680             warn "error retrieving biblio $record_number";
681             return;
682         }
683     } else {
684         eval { $marc = GetAuthority($record_number); };
685         if ($@) {
686             warn "error retrieving authority $record_number";
687             return;
688         }
689     }
690     return $marc;
691 }
692
693 sub fix_leader {
694     # FIXME - this routine is suspect
695     # It blanks the Leader/00-05 and Leader/12-16 to
696     # force them to be recalculated correct when
697     # the $marc->as_usmarc() or $marc->as_xml() is called.
698     # But why is this necessary?  It would be a serious bug
699     # in MARC::Record (definitely) and MARC::File::XML (arguably)
700     # if they are emitting incorrect leader values.
701     my $marc = shift;
702
703     my $leader = $marc->leader;
704     substr($leader,  0, 5) = '     ';
705     substr($leader, 10, 7) = '22     ';
706     $marc->leader(substr($leader, 0, 24));
707 }
708
709 sub fix_biblio_ids {
710     # FIXME - it is essential to ensure that the biblionumber is present,
711     #         otherwise, Zebra will choke on the record.  However, this
712     #         logic belongs in the relevant C4::Biblio APIs.
713     my $marc = shift;
714     my $biblionumber = shift;
715     my $biblioitemnumber;
716     if (@_) {
717         $biblioitemnumber = shift;
718     } else {
719         my $sth = $dbh->prepare(
720             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
721         $sth->execute($biblionumber);
722         ($biblioitemnumber) = $sth->fetchrow_array;
723         $sth->finish;
724         unless ($biblioitemnumber) {
725             warn "failed to get biblioitemnumber for biblio $biblionumber";
726             return 0;
727         }
728     }
729
730     # FIXME - this is cheating on two levels
731     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
732     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
733     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
734     #
735     # On the other hand, this better for now than what rebuild_zebra.pl used to
736     # do, which was duplicate the code for inserting the biblionumber
737     # and biblioitemnumber
738     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
739
740     return 1;
741 }
742
743 sub fix_authority_id {
744     # FIXME - as with fix_biblio_ids, the authid must be present
745     #         for Zebra's sake.  However, this really belongs
746     #         in C4::AuthoritiesMarc.
747     my ($marc, $authid) = @_;
748     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
749         $marc->delete_field($marc->field('001'));
750         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
751     }
752 }
753
754 sub fix_unimarc_100 {
755     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
756     my $marc = shift;
757
758     my $string;
759     my $length_100a = length($marc->subfield( 100, "a" ));
760     if (  $length_100a and $length_100a == 36 ) {
761         $string = $marc->subfield( 100, "a" );
762         my $f100 = $marc->field(100);
763         $marc->delete_field($f100);
764     }
765     else {
766         $string = POSIX::strftime( "%Y%m%d", localtime );
767         $string =~ s/\-//g;
768         $string = sprintf( "%-*s", 35, $string );
769     }
770     substr( $string, 22, 6, "frey50" );
771     $length_100a = length($marc->subfield( 100, "a" ));
772     unless ( $length_100a and $length_100a == 36 ) {
773         $marc->delete_field($marc->field(100));
774         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
775     }
776 }
777
778 sub do_indexing {
779     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
780
781     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
782     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
783     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
784     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
785
786     $noshadow //= '';
787
788     if ($noshadow or $reset_index) {
789         $noshadow = '-n';
790     }
791
792     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
793     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
794     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
795 }
796
797 sub _flock {
798     # test if flock is present; if so, use it; if not, return true
799     # op refers to the official flock operations including LOCK_EX,
800     # LOCK_UN, etc.
801     # combining LOCK_EX with LOCK_NB returns immediately
802     my ($fh, $op)= @_;
803     if( !defined($use_flock) ) {
804         #check if flock is present; if not, you will have a fatal error
805         my $lock_acquired = eval { flock($fh, $op) };
806         # assuming that $fh and $op are fine(..), an undef $lock_acquired
807         # means no flock
808         $use_flock = defined($lock_acquired) ? 1 : 0;
809         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
810         return 1 if !$use_flock;
811         return $lock_acquired;
812     } else {
813         return 1 if !$use_flock;
814         return flock($fh, $op);
815     }
816 }
817
818 sub _create_lockfile { #returns undef on failure
819     my $dir= shift;
820     unless (-d $dir) {
821         eval { mkpath($dir, 0, oct(755)) };
822         return if $@;
823     }
824     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
825     return ( $fh, $dir.'/'.LOCK_FILENAME );
826 }
827
828 sub print_usage {
829     print <<_USAGE_;
830 $0: reindex MARC bibs and/or authorities in Zebra.
831
832 Use this batch job to reindex all biblio or authority
833 records in your Koha database.
834
835 Parameters:
836
837     -b                      index bibliographic records
838
839     -a                      index authority records
840
841     -daemon                 Run in daemon mode.  The program will loop checking
842                             for entries on the zebraqueue table, processing
843                             them incrementally if present, and then sleep
844                             for a few seconds before repeating the process
845                             Checking the zebraqueue table is done with a cheap
846                             SQL query.  This allows for near realtime update of
847                             the zebra search index with low system overhead.
848                             Use -sleep to control the checking interval.
849
850                             Daemon mode implies -z, -a, -b.  The program will
851                             refuse to start if options are present that do not
852                             make sense while running as an incremental update
853                             daemon (e.g. -r or -offset).
854
855     -sleep 10               Seconds to sleep between checks of the zebraqueue
856                             table in daemon mode.  The default is 5 seconds.
857
858     -z                      select only updated and deleted
859                             records marked in the zebraqueue
860                             table.  Cannot be used with -r
861                             or -s.
862
863     --skip-deletes          only select record updates, not record
864                             deletions, to avoid potential excessive
865                             I/O when zebraidx processes deletions.
866                             If this option is used for normal indexing,
867                             a cronjob should be set up to run
868                             rebuild_zebra.pl -z without --skip-deletes
869                             during off hours.
870                             Only effective with -z.
871
872     -r                      clear Zebra index before
873                             adding records to index. Implies -w.
874
875     -d                      Temporary directory for indexing.
876                             If not specified, one is automatically
877                             created.  The export directory
878                             is automatically deleted unless
879                             you supply the -k switch.
880
881     -k                      Do not delete export directory.
882
883     -s                      Skip export.  Used if you have
884                             already exported the records
885                             in a previous run.
886
887     -nosanitize             export biblio/authority records directly from DB marcxml
888                             field without sanitizing records. It speed up
889                             dump process but could fail if DB contains badly
890                             encoded records. Works only with -x,
891
892     -w                      skip shadow indexing for this batch
893
894     -y                      do NOT clear zebraqueue after indexing; normally,
895                             after doing batch indexing, zebraqueue should be
896                             marked done for the affected record type(s) so that
897                             a running zebraqueue_daemon doesn't try to reindex
898                             the same records - specify -y to override this.
899                             Cannot be used with -z.
900
901     -v                      increase the amount of logging.  Normally only
902                             warnings and errors from the indexing are shown.
903                             Use log level 2 (-v -v) to include all Zebra logs.
904
905     --length   1234         how many biblio you want to export
906     --offset 1243           offset you want to start to
907                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
908                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
909     --where                 let you specify a WHERE query, like itemtype='BOOK'
910                             or something like that
911
912     --run-as-root           explicitily allow script to run as 'root' user
913
914     --wait-for-lock         when not running in daemon mode, the default
915                             behavior is to abort a rebuild if the rebuild
916                             lock is busy.  This option will cause the program
917                             to wait for the lock to free and then continue
918                             processing the rebuild request,
919
920     --table                 specify a table (can be items, biblioitems, biblio, biblio_metadata) to retrieve biblionumber to index.
921                             biblioitems is the default value.
922
923     --help or -h            show this message.
924 _USAGE_
925 }