Bug 17935: Adjust some POD lines, fix a few typos
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use XML::LibXML;
30
31 use constant LOCK_FILENAME => 'rebuild..LCK';
32
33 # script that checks zebradir structure & create directories & mandatory files if needed
34 #
35 #
36
37 $|=1; # flushes output
38 # If the cron job starts us in an unreadable dir, we will break without
39 # this.
40 chdir $ENV{HOME} if (!(-r '.'));
41 my $daemon_mode;
42 my $daemon_sleep = 5;
43 my $directory;
44 my $nosanitize;
45 my $skip_export;
46 my $keep_export;
47 my $skip_index;
48 my $reset;
49 my $biblios;
50 my $authorities;
51 my $as_xml;
52 my $noshadow;
53 my $want_help;
54 my $process_zebraqueue;
55 my $process_zebraqueue_skip_deletes;
56 my $do_not_clear_zebraqueue;
57 my $length;
58 my $where;
59 my $offset;
60 my $run_as_root;
61 my $run_user = (getpwuid($<))[0];
62 my $wait_for_lock = 0;
63 my $use_flock;
64 my $table = 'biblioitems';
65
66 my $verbose_logging = 0;
67 my $zebraidx_log_opt = " -v none,fatal,warn ";
68 my $result = GetOptions(
69     'daemon'        => \$daemon_mode,
70     'sleep:i'       => \$daemon_sleep,
71     'd:s'           => \$directory,
72     'r|reset'       => \$reset,
73     's'             => \$skip_export,
74     'k'             => \$keep_export,
75     'I|skip-index'  => \$skip_index,
76     'nosanitize'    => \$nosanitize,
77     'b'             => \$biblios,
78     'w'             => \$noshadow,
79     'a'             => \$authorities,
80     'h|help'        => \$want_help,
81     'x'             => \$as_xml,
82     'y'             => \$do_not_clear_zebraqueue,
83     'z'             => \$process_zebraqueue,
84     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
85     'where:s'       => \$where,
86     'length:i'      => \$length,
87     'offset:i'      => \$offset,
88     'v+'            => \$verbose_logging,
89     'run-as-root'   => \$run_as_root,
90     'wait-for-lock' => \$wait_for_lock,
91     't|table:s'     => \$table,
92 );
93
94 if (not $result or $want_help) {
95     print_usage();
96     exit 0;
97 }
98
99 if ( $as_xml ) {
100     warn "Warning: You passed -x which is already the default and is now deprecated\n";
101     undef $as_xml; # Should not be used later
102 }
103
104 if( not defined $run_as_root and $run_user eq 'root') {
105     my $msg = "Warning: You are running this script as the user 'root'.\n";
106     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
107     $msg   .= "Please do '$0 --help' to see usage.\n";
108     die $msg;
109 }
110
111 if ($process_zebraqueue and ($skip_export or $reset)) {
112     my $msg = "Cannot specify -r or -s if -z is specified\n";
113     $msg   .= "Please do '$0 --help' to see usage.\n";
114     die $msg;
115 }
116
117 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
118     my $msg = "Cannot specify both -y and -z\n";
119     $msg   .= "Please do '$0 --help' to see usage.\n";
120     die $msg;
121 }
122
123 if ($daemon_mode) {
124     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
125     if ($skip_export or $keep_export or $skip_index or
126           $where or $length or $offset) {
127         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
128         $msg   .= "Please do '$0 --help' to see usage.\n";
129         die $msg;
130     }
131     $authorities = 1;
132     $biblios = 1;
133     $process_zebraqueue = 1;
134 }
135
136 if (not $biblios and not $authorities) {
137     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
138     $msg   .= "Please do '$0 --help' to see usage.\n";
139     die $msg;
140 }
141
142 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
143 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
144     die "Cannot specify -t|--table with value '$table'. Only "
145       . ( join ', ', @tables_allowed_for_select )
146       . " are allowed.";
147 }
148
149
150 #  -v is for verbose, which seems backwards here because of how logging is set
151 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
152 if ($verbose_logging >= 2) {
153     $zebraidx_log_opt = '-v none,fatal,warn,all';
154 }
155
156 my $use_tempdir = 0;
157 unless ($directory) {
158     $use_tempdir = 1;
159     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
160 }
161
162
163 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
164 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
165
166 my $kohadir = C4::Context->config('intranetdir');
167 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
168 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
169
170 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
171 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
172
173 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
174 <collection xmlns="http://www.loc.gov/MARC21/slim">
175 };
176
177 my $marcxml_close = q{
178 </collection>
179 };
180
181 # Protect again simultaneous update of the zebra index by using a lock file.
182 # Create our own lock directory if it is missing. This should be created
183 # by koha-zebra-ctl.sh or at system installation. If the desired directory
184 # does not exist and cannot be created, we fall back on /tmp - which will
185 # always work.
186
187 my ($lockfile, $LockFH);
188 foreach (
189     C4::Context->config("zebra_lockdir"),
190     '/var/lock/zebra_' . C4::Context->config('database'),
191     '/tmp/zebra_' . C4::Context->config('database')
192 ) {
193     #we try three possibilities (we really want to lock :)
194     next if !$_;
195     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
196     last if defined $LockFH;
197 }
198 if( !defined $LockFH ) {
199     print "WARNING: Could not create lock file $lockfile: $!\n";
200     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
201     print "Verify file permissions for it too.\n";
202     $use_flock = 0; # we disable file locking now and will continue
203                     # without it
204                     # note that this mimics old behavior (before we used
205                     # the lockfile)
206 };
207
208 if ( $verbose_logging ) {
209     print "Zebra configuration information\n";
210     print "================================\n";
211     print "Zebra biblio directory      = $biblioserverdir\n";
212     print "Zebra authorities directory = $authorityserverdir\n";
213     print "Koha directory              = $kohadir\n";
214     print "Lockfile                    = $lockfile\n" if $lockfile;
215     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
216     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
217     print "================================\n";
218 }
219
220 my $tester = XML::LibXML->new();
221 my $dbh;
222
223 # The main work is done here by calling do_one_pass().  We have added locking
224 # avoid race conditions between full rebuilds and incremental updates either from
225 # daemon mode or periodic invocation from cron.  The race can lead to an updated
226 # record being overwritten by a rebuild if the update is applied after the export
227 # by the rebuild and before the rebuild finishes (more likely to affect large
228 # catalogs).
229 #
230 # We have chosen to exit immediately by default if we cannot obtain the lock
231 # to prevent the potential for a infinite backlog from cron invocations, but an
232 # option (wait-for-lock) is provided to let the program wait for the lock.
233 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
234 if ($daemon_mode) {
235     while (1) {
236         # For incremental updates, skip the update if the updates are locked
237         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
238             eval {
239                 $dbh = C4::Context->dbh;
240                 do_one_pass() if ( zebraqueue_not_empty() );
241             };
242             if ($@ && $verbose_logging) {
243                 warn "Warning : $@\n";
244             }
245             _flock($LockFH, LOCK_UN);
246         }
247         sleep $daemon_sleep;
248     }
249 } else {
250     # all one-off invocations
251     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
252     if (_flock($LockFH, $lock_mode)) {
253         $dbh = C4::Context->dbh;
254         do_one_pass();
255         _flock($LockFH, LOCK_UN);
256     } else {
257         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
258     }
259 }
260
261
262 if ( $verbose_logging ) {
263     print "====================\n";
264     print "CLEANING\n";
265     print "====================\n";
266 }
267 if ($keep_export) {
268     print "NOTHING cleaned : the export $directory has been kept.\n";
269     print "You can re-run this script with the -s ";
270     if ($use_tempdir) {
271         print " and -d $directory parameters";
272     } else {
273         print "parameter";
274     }
275     print "\n";
276     print "if you just want to rebuild zebra after changing the record.abs\n";
277     print "or another zebra config file\n";
278 } else {
279     unless ($use_tempdir) {
280         # if we're using a temporary directory
281         # created by File::Temp, it will be removed
282         # automatically.
283         rmtree($directory, 0, 1);
284         print "directory $directory deleted\n";
285     }
286 }
287
288 sub do_one_pass {
289     if ($authorities) {
290         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
291     } else {
292         print "skipping authorities\n" if ( $verbose_logging );
293     }
294
295     if ($biblios) {
296         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
297     } else {
298         print "skipping biblios\n" if ( $verbose_logging );
299     }
300 }
301
302 # Check the zebra update queue and return true if there are records to process
303 # This routine will handle each of -ab, -a, or -b, but in practice we force
304 # -ab when in daemon mode.
305 sub zebraqueue_not_empty {
306     my $where_str;
307
308     if ($authorities && $biblios) {
309         $where_str = 'done = 0;';
310     } elsif ($biblios) {
311         $where_str = 'server = "biblioserver" AND done = 0;';
312     } else {
313         $where_str = 'server = "authorityserver" AND done = 0;';
314     }
315     my $query =
316         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
317
318     $query->execute;
319     my $count = $query->fetchrow_arrayref->[0];
320     print "queued records: $count\n" if $verbose_logging > 0;
321     return $count > 0;
322 }
323
324 # This checks to see if the zebra directories exist under the provided path.
325 # If they don't, then zebra is likely to spit the dummy. This returns true
326 # if the directories had to be created, false otherwise.
327 sub check_zebra_dirs {
328     my ($base) = shift() . '/';
329     my $needed_repairing = 0;
330     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
331     foreach my $dir (@dirs) {
332         my $bdir = $base . $dir;
333         if (! -d $bdir) {
334             $needed_repairing = 1;
335             mkdir $bdir || die "Unable to create '$bdir': $!\n";
336             print "$0: needed to create '$bdir'\n";
337         }
338     }
339     return $needed_repairing;
340 }   # ----------  end of subroutine check_zebra_dirs  ----------
341
342 sub index_records {
343     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
344
345     my $num_records_exported = 0;
346     my $records_deleted = {};
347     my $need_reset = check_zebra_dirs($server_dir);
348     if ($need_reset) {
349         print "$0: found broken zebra server directories: forcing a rebuild\n";
350         $reset = 1;
351     }
352     if ($skip_export && $verbose_logging) {
353         print "====================\n";
354         print "SKIPPING $record_type export\n";
355         print "====================\n";
356     } else {
357         if ( $verbose_logging ) {
358             print "====================\n";
359             print "exporting $record_type\n";
360             print "====================\n";
361         }
362         mkdir "$directory" unless (-d $directory);
363         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
364         if ($process_zebraqueue) {
365             my $entries;
366
367             unless ( $process_zebraqueue_skip_deletes ) {
368                 $entries = select_zebraqueue_records($record_type, 'deleted');
369                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
370                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
371                 mark_zebraqueue_batch_done($entries);
372             }
373
374             $entries = select_zebraqueue_records($record_type, 'updated');
375             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
376             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
377             mark_zebraqueue_batch_done($entries);
378
379         } else {
380             my $sth = select_all_records($record_type);
381             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
382             unless ($do_not_clear_zebraqueue) {
383                 mark_all_zebraqueue_done($record_type);
384             }
385         }
386     }
387
388     #
389     # and reindexing everything
390     #
391     if ($skip_index) {
392         if ($verbose_logging) {
393             print "====================\n";
394             print "SKIPPING $record_type indexing\n";
395             print "====================\n";
396         }
397     } else {
398         if ( $verbose_logging ) {
399             print "====================\n";
400             print "REINDEXING zebra\n";
401             print "====================\n";
402         }
403         my $record_fmt = 'marcxml';
404         if ($process_zebraqueue) {
405             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
406                 if %$records_deleted;
407             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
408                 if $num_records_exported;
409         } else {
410             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
411                 if ($num_records_exported or $skip_export);
412         }
413     }
414 }
415
416
417 sub select_zebraqueue_records {
418     my ($record_type, $update_type) = @_;
419
420     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
421     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
422
423     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
424                              FROM zebraqueue
425                              WHERE server = ?
426                              AND   operation = ?
427                              AND   done = 0
428                              ORDER BY id DESC");
429     $sth->execute($server, $op);
430     my $entries = $sth->fetchall_arrayref({});
431 }
432
433 sub mark_all_zebraqueue_done {
434     my ($record_type) = @_;
435
436     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
437
438     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
439                              WHERE server = ?
440                              AND done = 0");
441     $sth->execute($server);
442 }
443
444 sub mark_zebraqueue_batch_done {
445     my ($entries) = @_;
446
447     $dbh->{AutoCommit} = 0;
448     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
449     $dbh->commit();
450     foreach my $id (map { $_->{id} } @$entries) {
451         $sth->execute($id);
452     }
453     $dbh->{AutoCommit} = 1;
454 }
455
456 sub select_all_records {
457     my $record_type = shift;
458     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
459 }
460
461 sub select_all_authorities {
462     my $strsth=qq{SELECT authid FROM auth_header};
463     $strsth.=qq{ WHERE $where } if ($where);
464     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
465     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
466     my $sth = $dbh->prepare($strsth);
467     $sth->execute();
468     return $sth;
469 }
470
471 sub select_all_biblios {
472     $table = 'biblioitems'
473       unless grep { /^$table$/ } @tables_allowed_for_select;
474     my $strsth = qq{ SELECT biblionumber FROM $table };
475     $strsth.=qq{ WHERE $where } if ($where);
476     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
477     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
478     my $sth = $dbh->prepare($strsth);
479     $sth->execute();
480     return $sth;
481 }
482
483 sub export_marc_records_from_sth {
484     my ($record_type, $sth, $directory, $nosanitize) = @_;
485
486     my $num_exported = 0;
487     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
488
489     print {$fh} $marcxml_open;
490
491     my $i = 0;
492     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
493     while (my ($record_number) = $sth->fetchrow_array) {
494         print "." if ( $verbose_logging );
495         print "\r$i" unless ($i++ %100 or !$verbose_logging);
496         if ( $nosanitize ) {
497             my $marcxml = $record_type eq 'biblio'
498                           ? GetXmlBiblio( $record_number )
499                           : GetAuthorityXML( $record_number );
500             if ($record_type eq 'biblio'){
501                 my @items = GetItemsInfo($record_number);
502                 if (@items){
503                     my $record = MARC::Record->new;
504                     $record->encoding('UTF-8');
505                     my @itemsrecord;
506                     foreach my $item (@items){
507                         my $record = Item2Marc($item, $record_number);
508                         push @itemsrecord, $record->field($itemtag);
509                     }
510                     $record->insert_fields_ordered(@itemsrecord);
511                     my $itemsxml = $record->as_xml_record();
512                     $marcxml =
513                         substr($marcxml, 0, length($marcxml)-10) .
514                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
515                 }
516             }
517             # extra test to ensure that result is valid XML; otherwise
518             # Zebra won't parse it in DOM mode
519             eval {
520                 my $doc = $tester->parse_string($marcxml);
521             };
522             if ($@) {
523                 warn "Error exporting record $record_number ($record_type): $@\n";
524                 next;
525             }
526             if ( $marcxml ) {
527                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
528                 print {$fh} $marcxml;
529                 $num_exported++;
530             }
531             next;
532         }
533         my ($marc) = get_corrected_marc_record($record_type, $record_number);
534         if (defined $marc) {
535             eval {
536                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
537                 eval {
538                     my $doc = $tester->parse_string($rec);
539                 };
540                 if ($@) {
541                     die "invalid XML: $@";
542                 }
543                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
544                 print {$fh} $rec;
545                 $num_exported++;
546             };
547             if ($@) {
548                 warn "Error exporting record $record_number ($record_type) XML";
549                 warn "... specific error is $@" if $verbose_logging;
550             }
551         }
552     }
553     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
554     print {$fh} $marcxml_close;
555
556     close $fh;
557     return $num_exported;
558 }
559
560 sub export_marc_records_from_list {
561     my ($record_type, $entries, $directory, $records_deleted) = @_;
562
563     my $num_exported = 0;
564     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
565
566     print {$fh} $marcxml_open;
567
568     my $i = 0;
569
570     # Skip any deleted records. We check for this anyway, but this reduces error spam
571     my %found = %$records_deleted;
572     foreach my $record_number ( map { $_->{biblio_auth_number} }
573                                 grep { !$found{ $_->{biblio_auth_number} }++ }
574                                 @$entries ) {
575         print "." if ( $verbose_logging );
576         print "\r$i" unless ($i++ %100 or !$verbose_logging);
577         my ($marc) = get_corrected_marc_record($record_type, $record_number);
578         if (defined $marc) {
579             eval {
580                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
581                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
582                 print {$fh} $rec;
583                 $num_exported++;
584             };
585             if ($@) {
586               warn "Error exporting record $record_number ($record_type) XML";
587             }
588         }
589     }
590     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
591
592     print {$fh} $marcxml_close;
593
594     close $fh;
595     return $num_exported;
596 }
597
598 sub generate_deleted_marc_records {
599
600     my ($record_type, $entries, $directory) = @_;
601
602     my $records_deleted = {};
603     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
604
605     print {$fh} $marcxml_open;
606
607     my $i = 0;
608     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
609         print "\r$i" unless ($i++ %100 or !$verbose_logging);
610         print "." if ( $verbose_logging );
611
612         my $marc = MARC::Record->new();
613         if ($record_type eq 'biblio') {
614             fix_biblio_ids($marc, $record_number, $record_number);
615         } else {
616             fix_authority_id($marc, $record_number);
617         }
618         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
619             fix_unimarc_100($marc);
620         }
621
622         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
623         # Remove the record's XML header
624         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
625         print {$fh} $rec;
626
627         $records_deleted->{$record_number} = 1;
628     }
629     print "\nRecords exported: $i\n" if ( $verbose_logging );
630
631     print {$fh} $marcxml_close;
632
633     close $fh;
634     return $records_deleted;
635 }
636
637 sub get_corrected_marc_record {
638     my ($record_type, $record_number) = @_;
639
640     my $marc = get_raw_marc_record($record_type, $record_number);
641
642     if (defined $marc) {
643         fix_leader($marc);
644         if ($record_type eq 'authority') {
645             fix_authority_id($marc, $record_number);
646         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
647             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
648             $marc = $normalizer->process($marc);
649         }
650         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
651             fix_unimarc_100($marc);
652         }
653     }
654
655     return $marc;
656 }
657
658 sub get_raw_marc_record {
659     my ($record_type, $record_number) = @_;
660
661     my $marc;
662     if ($record_type eq 'biblio') {
663         eval { $marc = GetMarcBiblio($record_number, 1); };
664         if ($@ || !$marc) {
665             # here we do warn since catching an exception
666             # means that the bib was found but failed
667             # to be parsed
668             warn "error retrieving biblio $record_number";
669             return;
670         }
671     } else {
672         eval { $marc = GetAuthority($record_number); };
673         if ($@) {
674             warn "error retrieving authority $record_number";
675             return;
676         }
677     }
678     return $marc;
679 }
680
681 sub fix_leader {
682     # FIXME - this routine is suspect
683     # It blanks the Leader/00-05 and Leader/12-16 to
684     # force them to be recalculated correct when
685     # the $marc->as_usmarc() or $marc->as_xml() is called.
686     # But why is this necessary?  It would be a serious bug
687     # in MARC::Record (definitely) and MARC::File::XML (arguably)
688     # if they are emitting incorrect leader values.
689     my $marc = shift;
690
691     my $leader = $marc->leader;
692     substr($leader,  0, 5) = '     ';
693     substr($leader, 10, 7) = '22     ';
694     $marc->leader(substr($leader, 0, 24));
695 }
696
697 sub fix_biblio_ids {
698     # FIXME - it is essential to ensure that the biblionumber is present,
699     #         otherwise, Zebra will choke on the record.  However, this
700     #         logic belongs in the relevant C4::Biblio APIs.
701     my $marc = shift;
702     my $biblionumber = shift;
703     my $biblioitemnumber;
704     if (@_) {
705         $biblioitemnumber = shift;
706     } else {
707         my $sth = $dbh->prepare(
708             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
709         $sth->execute($biblionumber);
710         ($biblioitemnumber) = $sth->fetchrow_array;
711         $sth->finish;
712         unless ($biblioitemnumber) {
713             warn "failed to get biblioitemnumber for biblio $biblionumber";
714             return 0;
715         }
716     }
717
718     # FIXME - this is cheating on two levels
719     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
720     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
721     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
722     #
723     # On the other hand, this better for now than what rebuild_zebra.pl used to
724     # do, which was duplicate the code for inserting the biblionumber
725     # and biblioitemnumber
726     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
727
728     return 1;
729 }
730
731 sub fix_authority_id {
732     # FIXME - as with fix_biblio_ids, the authid must be present
733     #         for Zebra's sake.  However, this really belongs
734     #         in C4::AuthoritiesMarc.
735     my ($marc, $authid) = @_;
736     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
737         $marc->delete_field($marc->field('001'));
738         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
739     }
740 }
741
742 sub fix_unimarc_100 {
743     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
744     my $marc = shift;
745
746     my $string;
747     my $length_100a = length($marc->subfield( 100, "a" ));
748     if (  $length_100a and $length_100a == 36 ) {
749         $string = $marc->subfield( 100, "a" );
750         my $f100 = $marc->field(100);
751         $marc->delete_field($f100);
752     }
753     else {
754         $string = POSIX::strftime( "%Y%m%d", localtime );
755         $string =~ s/\-//g;
756         $string = sprintf( "%-*s", 35, $string );
757     }
758     substr( $string, 22, 6, "frey50" );
759     $length_100a = length($marc->subfield( 100, "a" ));
760     unless ( $length_100a and $length_100a == 36 ) {
761         $marc->delete_field($marc->field(100));
762         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
763     }
764 }
765
766 sub do_indexing {
767     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
768
769     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
770     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
771     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
772     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
773
774     $noshadow //= '';
775
776     if ($noshadow or $reset_index) {
777         $noshadow = '-n';
778     }
779
780     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
781     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
782     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
783 }
784
785 sub _flock {
786     # test if flock is present; if so, use it; if not, return true
787     # op refers to the official flock operations including LOCK_EX,
788     # LOCK_UN, etc.
789     # combining LOCK_EX with LOCK_NB returns immediately
790     my ($fh, $op)= @_;
791     if( !defined($use_flock) ) {
792         #check if flock is present; if not, you will have a fatal error
793         my $lock_acquired = eval { flock($fh, $op) };
794         # assuming that $fh and $op are fine(..), an undef $lock_acquired
795         # means no flock
796         $use_flock = defined($lock_acquired) ? 1 : 0;
797         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
798         return 1 if !$use_flock;
799         return $lock_acquired;
800     } else {
801         return 1 if !$use_flock;
802         return flock($fh, $op);
803     }
804 }
805
806 sub _create_lockfile { #returns undef on failure
807     my $dir= shift;
808     unless (-d $dir) {
809         eval { mkpath($dir, 0, oct(755)) };
810         return if $@;
811     }
812     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
813     return ( $fh, $dir.'/'.LOCK_FILENAME );
814 }
815
816 sub print_usage {
817     print <<_USAGE_;
818 $0: reindex MARC bibs and/or authorities in Zebra.
819
820 Use this batch job to reindex all biblio or authority
821 records in your Koha database.
822
823 Parameters:
824
825     -b                      index bibliographic records
826
827     -a                      index authority records
828
829     -daemon                 Run in daemon mode.  The program will loop checking
830                             for entries on the zebraqueue table, processing
831                             them incrementally if present, and then sleep
832                             for a few seconds before repeating the process
833                             Checking the zebraqueue table is done with a cheap
834                             SQL query.  This allows for near realtime update of
835                             the zebra search index with low system overhead.
836                             Use -sleep to control the checking interval.
837
838                             Daemon mode implies -z, -a, -b.  The program will
839                             refuse to start if options are present that do not
840                             make sense while running as an incremental update
841                             daemon (e.g. -r or -offset).
842
843     -sleep 10               Seconds to sleep between checks of the zebraqueue
844                             table in daemon mode.  The default is 5 seconds.
845
846     -z                      select only updated and deleted
847                             records marked in the zebraqueue
848                             table.  Cannot be used with -r
849                             or -s.
850
851     --skip-deletes          only select record updates, not record
852                             deletions, to avoid potential excessive
853                             I/O when zebraidx processes deletions.
854                             If this option is used for normal indexing,
855                             a cronjob should be set up to run
856                             rebuild_zebra.pl -z without --skip-deletes
857                             during off hours.
858                             Only effective with -z.
859
860     -r                      clear Zebra index before
861                             adding records to index. Implies -w.
862
863     -d                      Temporary directory for indexing.
864                             If not specified, one is automatically
865                             created.  The export directory
866                             is automatically deleted unless
867                             you supply the -k switch.
868
869     -k                      Do not delete export directory.
870
871     -s                      Skip export.  Used if you have
872                             already exported the records
873                             in a previous run.
874
875     -nosanitize             export biblio/authority records directly from DB marcxml
876                             field without sanitizing records. It speed up
877                             dump process but could fail if DB contains badly
878                             encoded records. Works only with -x,
879
880     -w                      skip shadow indexing for this batch
881
882     -y                      do NOT clear zebraqueue after indexing; normally,
883                             after doing batch indexing, zebraqueue should be
884                             marked done for the affected record type(s) so that
885                             a running zebraqueue_daemon doesn't try to reindex
886                             the same records - specify -y to override this.
887                             Cannot be used with -z.
888
889     -v                      increase the amount of logging.  Normally only
890                             warnings and errors from the indexing are shown.
891                             Use log level 2 (-v -v) to include all Zebra logs.
892
893     --length   1234         how many biblio you want to export
894     --offset 1243           offset you want to start to
895                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
896                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
897     --where                 let you specify a WHERE query, like itemtype='BOOK'
898                             or something like that
899
900     --run-as-root           explicitily allow script to run as 'root' user
901
902     --wait-for-lock         when not running in daemon mode, the default
903                             behavior is to abort a rebuild if the rebuild
904                             lock is busy.  This option will cause the program
905                             to wait for the lock to free and then continue
906                             processing the rebuild request,
907
908     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
909                             biblioitems is the default value.
910
911     --help or -h            show this message.
912 _USAGE_
913 }