Bug 21908: Add DISTINCT biblionumber to rebuild_zebra.pl
[koha.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use Koha::Caches;
30 use XML::LibXML;
31
32 use constant LOCK_FILENAME => 'rebuild..LCK';
33
34 # script that checks zebradir structure & create directories & mandatory files if needed
35 #
36 #
37
38 $|=1; # flushes output
39 # If the cron job starts us in an unreadable dir, we will break without
40 # this.
41 chdir $ENV{HOME} if (!(-r '.'));
42 my $daemon_mode;
43 my $daemon_sleep = 5;
44 my $directory;
45 my $nosanitize;
46 my $skip_export;
47 my $keep_export;
48 my $skip_index;
49 my $reset;
50 my $biblios;
51 my $authorities;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66 my $is_memcached = Koha::Caches->get_instance->memcached_cache;
67
68 my $verbose_logging = 0;
69 my $zebraidx_log_opt = " -v none,fatal,warn ";
70 my $result = GetOptions(
71     'daemon'        => \$daemon_mode,
72     'sleep:i'       => \$daemon_sleep,
73     'd:s'           => \$directory,
74     'r|reset'       => \$reset,
75     's'             => \$skip_export,
76     'k'             => \$keep_export,
77     'I|skip-index'  => \$skip_index,
78     'nosanitize'    => \$nosanitize,
79     'b'             => \$biblios,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated\n";
103     undef $as_xml; # Should not be used later
104 }
105
106 if( not defined $run_as_root and $run_user eq 'root') {
107     my $msg = "Warning: You are running this script as the user 'root'.\n";
108     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
109     $msg   .= "Please do '$0 --help' to see usage.\n";
110     die $msg;
111 }
112
113 if ($process_zebraqueue and ($skip_export or $reset)) {
114     my $msg = "Cannot specify -r or -s if -z is specified\n";
115     $msg   .= "Please do '$0 --help' to see usage.\n";
116     die $msg;
117 }
118
119 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
120     my $msg = "Cannot specify both -y and -z\n";
121     $msg   .= "Please do '$0 --help' to see usage.\n";
122     die $msg;
123 }
124
125 if ($daemon_mode) {
126     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
127     if ($skip_export or $keep_export or $skip_index or
128           $where or $length or $offset) {
129         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
130         $msg   .= "Please do '$0 --help' to see usage.\n";
131         die $msg;
132     }
133     unless ($is_memcached) {
134         warn "Warning: script running in daemon mode, without recommended caching system (memcached).\n";
135     }
136     $authorities = 1;
137     $biblios = 1;
138     $process_zebraqueue = 1;
139 }
140
141 if (not $biblios and not $authorities) {
142     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
143     $msg   .= "Please do '$0 --help' to see usage.\n";
144     die $msg;
145 }
146
147 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio', 'biblio_metadata' );
148 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
149     die "Cannot specify -t|--table with value '$table'. Only "
150       . ( join ', ', @tables_allowed_for_select )
151       . " are allowed.";
152 }
153
154
155 #  -v is for verbose, which seems backwards here because of how logging is set
156 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
157 if ($verbose_logging >= 2) {
158     $zebraidx_log_opt = '-v none,fatal,warn,all';
159 }
160
161 my $use_tempdir = 0;
162 unless ($directory) {
163     $use_tempdir = 1;
164     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
165 }
166
167
168 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
169 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
170
171 my $kohadir = C4::Context->config('intranetdir');
172 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
173 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
174
175 my ($biblionumbertagfield,$biblionumbertagsubfield) = C4::Biblio::GetMarcFromKohaField("biblio.biblionumber","");
176 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = C4::Biblio::GetMarcFromKohaField("biblioitems.biblioitemnumber","");
177
178 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
179 <collection xmlns="http://www.loc.gov/MARC21/slim">
180 };
181
182 my $marcxml_close = q{
183 </collection>
184 };
185
186 # Protect again simultaneous update of the zebra index by using a lock file.
187 # Create our own lock directory if it is missing. This should be created
188 # by koha-zebra-ctl.sh or at system installation. If the desired directory
189 # does not exist and cannot be created, we fall back on /tmp - which will
190 # always work.
191
192 my ($lockfile, $LockFH);
193 foreach (
194     C4::Context->config("zebra_lockdir"),
195     '/var/lock/zebra_' . C4::Context->config('database'),
196     '/tmp/zebra_' . C4::Context->config('database')
197 ) {
198     #we try three possibilities (we really want to lock :)
199     next if !$_;
200     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
201     last if defined $LockFH;
202 }
203 if( !defined $LockFH ) {
204     print "WARNING: Could not create lock file $lockfile: $!\n";
205     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
206     print "Verify file permissions for it too.\n";
207     $use_flock = 0; # we disable file locking now and will continue
208                     # without it
209                     # note that this mimics old behavior (before we used
210                     # the lockfile)
211 };
212
213 if ( $verbose_logging ) {
214     print "Zebra configuration information\n";
215     print "================================\n";
216     print "Zebra biblio directory      = $biblioserverdir\n";
217     print "Zebra authorities directory = $authorityserverdir\n";
218     print "Koha directory              = $kohadir\n";
219     print "Lockfile                    = $lockfile\n" if $lockfile;
220     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
221     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
222     print "================================\n";
223 }
224
225 my $tester = XML::LibXML->new();
226 my $dbh;
227
228 # The main work is done here by calling do_one_pass().  We have added locking
229 # avoid race conditions between full rebuilds and incremental updates either from
230 # daemon mode or periodic invocation from cron.  The race can lead to an updated
231 # record being overwritten by a rebuild if the update is applied after the export
232 # by the rebuild and before the rebuild finishes (more likely to affect large
233 # catalogs).
234 #
235 # We have chosen to exit immediately by default if we cannot obtain the lock
236 # to prevent the potential for a infinite backlog from cron invocations, but an
237 # option (wait-for-lock) is provided to let the program wait for the lock.
238 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
239 if ($daemon_mode) {
240     while (1) {
241         # For incremental updates, skip the update if the updates are locked
242         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
243             eval {
244                 $dbh = C4::Context->dbh;
245                 if( zebraqueue_not_empty() ) {
246                     Koha::Caches->flush_L1_caches() if $is_memcached;
247                     do_one_pass();
248                 }
249             };
250             if ($@ && $verbose_logging) {
251                 warn "Warning : $@\n";
252             }
253             _flock($LockFH, LOCK_UN);
254         }
255         sleep $daemon_sleep;
256     }
257 } else {
258     # all one-off invocations
259     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
260     if (_flock($LockFH, $lock_mode)) {
261         $dbh = C4::Context->dbh;
262         do_one_pass();
263         _flock($LockFH, LOCK_UN);
264     } else {
265         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
266     }
267 }
268
269
270 if ( $verbose_logging ) {
271     print "====================\n";
272     print "CLEANING\n";
273     print "====================\n";
274 }
275 if ($keep_export) {
276     print "NOTHING cleaned : the export $directory has been kept.\n";
277     print "You can re-run this script with the -s ";
278     if ($use_tempdir) {
279         print " and -d $directory parameters";
280     } else {
281         print "parameter";
282     }
283     print "\n";
284     print "if you just want to rebuild zebra after changing the record.abs\n";
285     print "or another zebra config file\n";
286 } else {
287     unless ($use_tempdir) {
288         # if we're using a temporary directory
289         # created by File::Temp, it will be removed
290         # automatically.
291         rmtree($directory, 0, 1);
292         print "directory $directory deleted\n";
293     }
294 }
295
296 sub do_one_pass {
297     if ($authorities) {
298         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
299     } else {
300         print "skipping authorities\n" if ( $verbose_logging );
301     }
302
303     if ($biblios) {
304         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
305     } else {
306         print "skipping biblios\n" if ( $verbose_logging );
307     }
308 }
309
310 # Check the zebra update queue and return true if there are records to process
311 # This routine will handle each of -ab, -a, or -b, but in practice we force
312 # -ab when in daemon mode.
313 sub zebraqueue_not_empty {
314     my $where_str;
315
316     if ($authorities && $biblios) {
317         $where_str = 'done = 0;';
318     } elsif ($biblios) {
319         $where_str = 'server = "biblioserver" AND done = 0;';
320     } else {
321         $where_str = 'server = "authorityserver" AND done = 0;';
322     }
323     my $query =
324         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
325
326     $query->execute;
327     my $count = $query->fetchrow_arrayref->[0];
328     print "queued records: $count\n" if $verbose_logging > 0;
329     return $count > 0;
330 }
331
332 # This checks to see if the zebra directories exist under the provided path.
333 # If they don't, then zebra is likely to spit the dummy. This returns true
334 # if the directories had to be created, false otherwise.
335 sub check_zebra_dirs {
336     my ($base) = shift() . '/';
337     my $needed_repairing = 0;
338     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
339     foreach my $dir (@dirs) {
340         my $bdir = $base . $dir;
341         if (! -d $bdir) {
342             $needed_repairing = 1;
343             mkdir $bdir || die "Unable to create '$bdir': $!\n";
344             print "$0: needed to create '$bdir'\n";
345         }
346     }
347     return $needed_repairing;
348 }   # ----------  end of subroutine check_zebra_dirs  ----------
349
350 sub index_records {
351     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
352
353     my $num_records_exported = 0;
354     my $records_deleted = {};
355     my $need_reset = check_zebra_dirs($server_dir);
356     if ($need_reset) {
357         print "$0: found broken zebra server directories: forcing a rebuild\n";
358         $reset = 1;
359     }
360     if ($skip_export && $verbose_logging) {
361         print "====================\n";
362         print "SKIPPING $record_type export\n";
363         print "====================\n";
364     } else {
365         if ( $verbose_logging ) {
366             print "====================\n";
367             print "exporting $record_type\n";
368             print "====================\n";
369         }
370         mkdir "$directory" unless (-d $directory);
371         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
372         if ($process_zebraqueue) {
373             my $entries;
374
375             unless ( $process_zebraqueue_skip_deletes ) {
376                 $entries = select_zebraqueue_records($record_type, 'deleted');
377                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
378                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
379                 mark_zebraqueue_batch_done($entries);
380             }
381
382             $entries = select_zebraqueue_records($record_type, 'updated');
383             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
384             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
385             mark_zebraqueue_batch_done($entries);
386
387         } else {
388             my $sth = select_all_records($record_type);
389             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
390             unless ($do_not_clear_zebraqueue) {
391                 mark_all_zebraqueue_done($record_type);
392             }
393         }
394     }
395
396     #
397     # and reindexing everything
398     #
399     if ($skip_index) {
400         if ($verbose_logging) {
401             print "====================\n";
402             print "SKIPPING $record_type indexing\n";
403             print "====================\n";
404         }
405     } else {
406         if ( $verbose_logging ) {
407             print "====================\n";
408             print "REINDEXING zebra\n";
409             print "====================\n";
410         }
411         my $record_fmt = 'marcxml';
412         if ($process_zebraqueue) {
413             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
414                 if %$records_deleted;
415             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
416                 if $num_records_exported;
417         } else {
418             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
419                 if ($num_records_exported or $skip_export);
420         }
421     }
422 }
423
424
425 sub select_zebraqueue_records {
426     my ($record_type, $update_type) = @_;
427
428     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
429     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
430
431     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
432                              FROM zebraqueue
433                              WHERE server = ?
434                              AND   operation = ?
435                              AND   done = 0
436                              ORDER BY id DESC");
437     $sth->execute($server, $op);
438     my $entries = $sth->fetchall_arrayref({});
439 }
440
441 sub mark_all_zebraqueue_done {
442     my ($record_type) = @_;
443
444     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
445
446     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
447                              WHERE server = ?
448                              AND done = 0");
449     $sth->execute($server);
450 }
451
452 sub mark_zebraqueue_batch_done {
453     my ($entries) = @_;
454
455     $dbh->{AutoCommit} = 0;
456     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
457     $dbh->commit();
458     foreach my $id (map { $_->{id} } @$entries) {
459         $sth->execute($id);
460     }
461     $dbh->{AutoCommit} = 1;
462 }
463
464 sub select_all_records {
465     my $record_type = shift;
466     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
467 }
468
469 sub select_all_authorities {
470     my $strsth=qq{SELECT authid FROM auth_header};
471     $strsth.=qq{ WHERE $where } if ($where);
472     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
473     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
474     my $sth = $dbh->prepare($strsth);
475     $sth->execute();
476     return $sth;
477 }
478
479 sub select_all_biblios {
480     $table = 'biblioitems'
481       unless grep { /^$table$/ } @tables_allowed_for_select;
482     my $strsth = qq{ SELECT DISTINCT(biblionumber) FROM $table };
483     $strsth.=qq{ WHERE $where } if ($where);
484     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
485     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
486     my $sth = $dbh->prepare($strsth);
487     $sth->execute();
488     return $sth;
489 }
490
491 sub export_marc_records_from_sth {
492     my ($record_type, $sth, $directory, $nosanitize) = @_;
493
494     my $num_exported = 0;
495     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
496
497     print {$fh} $marcxml_open;
498
499     my $i = 0;
500     my ( $itemtag, $itemsubfield ) = C4::Biblio::GetMarcFromKohaField("items.itemnumber",'');
501     while (my ($record_number) = $sth->fetchrow_array) {
502         print "." if ( $verbose_logging );
503         print "\r$i" unless ($i++ %100 or !$verbose_logging);
504         if ( $nosanitize ) {
505             my $marcxml = $record_type eq 'biblio'
506                           ? GetXmlBiblio( $record_number )
507                           : GetAuthorityXML( $record_number );
508             if ($record_type eq 'biblio'){
509                 my @items = GetItemsInfo($record_number);
510                 if (@items){
511                     my $record = MARC::Record->new;
512                     $record->encoding('UTF-8');
513                     my @itemsrecord;
514                     foreach my $item (@items){
515                         my $record = Item2Marc($item, $record_number);
516                         push @itemsrecord, $record->field($itemtag);
517                     }
518                     $record->insert_fields_ordered(@itemsrecord);
519                     my $itemsxml = $record->as_xml_record();
520                     $marcxml =
521                         substr($marcxml, 0, length($marcxml)-10) .
522                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
523                 }
524             }
525             # extra test to ensure that result is valid XML; otherwise
526             # Zebra won't parse it in DOM mode
527             eval {
528                 my $doc = $tester->parse_string($marcxml);
529             };
530             if ($@) {
531                 warn "Error exporting record $record_number ($record_type): $@\n";
532                 next;
533             }
534             if ( $marcxml ) {
535                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
536                 print {$fh} $marcxml;
537                 $num_exported++;
538             }
539             next;
540         }
541         my ($marc) = get_corrected_marc_record($record_type, $record_number);
542         if (defined $marc) {
543             eval {
544                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
545                 eval {
546                     my $doc = $tester->parse_string($rec);
547                 };
548                 if ($@) {
549                     die "invalid XML: $@";
550                 }
551                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
552                 print {$fh} $rec;
553                 $num_exported++;
554             };
555             if ($@) {
556                 warn "Error exporting record $record_number ($record_type) XML";
557                 warn "... specific error is $@" if $verbose_logging;
558             }
559         }
560     }
561     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
562     print {$fh} $marcxml_close;
563
564     close $fh;
565     return $num_exported;
566 }
567
568 sub export_marc_records_from_list {
569     my ($record_type, $entries, $directory, $records_deleted) = @_;
570
571     my $num_exported = 0;
572     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
573
574     print {$fh} $marcxml_open;
575
576     my $i = 0;
577
578     # Skip any deleted records. We check for this anyway, but this reduces error spam
579     my %found = %$records_deleted;
580     foreach my $record_number ( map { $_->{biblio_auth_number} }
581                                 grep { !$found{ $_->{biblio_auth_number} }++ }
582                                 @$entries ) {
583         print "." if ( $verbose_logging );
584         print "\r$i" unless ($i++ %100 or !$verbose_logging);
585         my ($marc) = get_corrected_marc_record($record_type, $record_number);
586         if (defined $marc) {
587             eval {
588                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
589                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
590                 print {$fh} $rec;
591                 $num_exported++;
592             };
593             if ($@) {
594               warn "Error exporting record $record_number ($record_type) XML";
595             }
596         }
597     }
598     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
599
600     print {$fh} $marcxml_close;
601
602     close $fh;
603     return $num_exported;
604 }
605
606 sub generate_deleted_marc_records {
607
608     my ($record_type, $entries, $directory) = @_;
609
610     my $records_deleted = {};
611     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
612
613     print {$fh} $marcxml_open;
614
615     my $i = 0;
616     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
617         print "\r$i" unless ($i++ %100 or !$verbose_logging);
618         print "." if ( $verbose_logging );
619
620         my $marc = MARC::Record->new();
621         if ($record_type eq 'biblio') {
622             fix_biblio_ids($marc, $record_number, $record_number);
623         } else {
624             fix_authority_id($marc, $record_number);
625         }
626         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
627             fix_unimarc_100($marc);
628         }
629
630         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
631         # Remove the record's XML header
632         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
633         print {$fh} $rec;
634
635         $records_deleted->{$record_number} = 1;
636     }
637     print "\nRecords exported: $i\n" if ( $verbose_logging );
638
639     print {$fh} $marcxml_close;
640
641     close $fh;
642     return $records_deleted;
643 }
644
645 sub get_corrected_marc_record {
646     my ( $record_type, $record_number ) = @_;
647
648     my $marc = get_raw_marc_record( $record_type, $record_number );
649
650     if ( defined $marc ) {
651         fix_leader($marc);
652         if ( $record_type eq 'authority' ) {
653             fix_authority_id( $marc, $record_number );
654         }
655         elsif ( $record_type eq 'biblio' ) {
656
657             my @filters;
658             push @filters, 'EmbedItemsAvailability';
659             push @filters, 'EmbedSeeFromHeadings'
660                 if C4::Context->preference('IncludeSeeFromInSearches');
661
662             my $normalizer = Koha::RecordProcessor->new( { filters => \@filters } );
663             $marc = $normalizer->process($marc);
664         }
665         if ( C4::Context->preference("marcflavour") eq "UNIMARC" ) {
666             fix_unimarc_100($marc);
667         }
668     }
669
670     return $marc;
671 }
672
673 sub get_raw_marc_record {
674     my ($record_type, $record_number) = @_;
675
676     my $marc;
677     if ($record_type eq 'biblio') {
678         eval { $marc = C4::Biblio::GetMarcBiblio({ biblionumber => $record_number, embed_items => 1 }); };
679         if ($@ || !$marc) {
680             # here we do warn since catching an exception
681             # means that the bib was found but failed
682             # to be parsed
683             warn "error retrieving biblio $record_number";
684             return;
685         }
686     } else {
687         eval { $marc = GetAuthority($record_number); };
688         if ($@) {
689             warn "error retrieving authority $record_number";
690             return;
691         }
692     }
693     return $marc;
694 }
695
696 sub fix_leader {
697     # FIXME - this routine is suspect
698     # It blanks the Leader/00-05 and Leader/12-16 to
699     # force them to be recalculated correct when
700     # the $marc->as_usmarc() or $marc->as_xml() is called.
701     # But why is this necessary?  It would be a serious bug
702     # in MARC::Record (definitely) and MARC::File::XML (arguably)
703     # if they are emitting incorrect leader values.
704     my $marc = shift;
705
706     my $leader = $marc->leader;
707     substr($leader,  0, 5) = '     ';
708     substr($leader, 10, 7) = '22     ';
709     $marc->leader(substr($leader, 0, 24));
710 }
711
712 sub fix_biblio_ids {
713     # FIXME - it is essential to ensure that the biblionumber is present,
714     #         otherwise, Zebra will choke on the record.  However, this
715     #         logic belongs in the relevant C4::Biblio APIs.
716     my $marc = shift;
717     my $biblionumber = shift;
718     my $biblioitemnumber;
719     if (@_) {
720         $biblioitemnumber = shift;
721     } else {
722         my $sth = $dbh->prepare(
723             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
724         $sth->execute($biblionumber);
725         ($biblioitemnumber) = $sth->fetchrow_array;
726         $sth->finish;
727         unless ($biblioitemnumber) {
728             warn "failed to get biblioitemnumber for biblio $biblionumber";
729             return 0;
730         }
731     }
732
733     # FIXME - this is cheating on two levels
734     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
735     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
736     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
737     #
738     # On the other hand, this better for now than what rebuild_zebra.pl used to
739     # do, which was duplicate the code for inserting the biblionumber
740     # and biblioitemnumber
741     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
742
743     return 1;
744 }
745
746 sub fix_authority_id {
747     # FIXME - as with fix_biblio_ids, the authid must be present
748     #         for Zebra's sake.  However, this really belongs
749     #         in C4::AuthoritiesMarc.
750     my ($marc, $authid) = @_;
751     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
752         $marc->delete_field($marc->field('001'));
753         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
754     }
755 }
756
757 sub fix_unimarc_100 {
758     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
759     my $marc = shift;
760
761     my $string;
762     my $length_100a = length($marc->subfield( 100, "a" ));
763     if (  $length_100a and $length_100a == 36 ) {
764         $string = $marc->subfield( 100, "a" );
765         my $f100 = $marc->field(100);
766         $marc->delete_field($f100);
767     }
768     else {
769         $string = POSIX::strftime( "%Y%m%d", localtime );
770         $string =~ s/\-//g;
771         $string = sprintf( "%-*s", 35, $string );
772     }
773     substr( $string, 22, 6, "frey50" );
774     $length_100a = length($marc->subfield( 100, "a" ));
775     unless ( $length_100a and $length_100a == 36 ) {
776         $marc->delete_field($marc->field(100));
777         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
778     }
779 }
780
781 sub do_indexing {
782     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
783
784     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
785     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
786     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
787     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
788
789     $noshadow //= '';
790
791     if ($noshadow or $reset_index) {
792         $noshadow = '-n';
793     }
794
795     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
796     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
797     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
798 }
799
800 sub _flock {
801     # test if flock is present; if so, use it; if not, return true
802     # op refers to the official flock operations including LOCK_EX,
803     # LOCK_UN, etc.
804     # combining LOCK_EX with LOCK_NB returns immediately
805     my ($fh, $op)= @_;
806     if( !defined($use_flock) ) {
807         #check if flock is present; if not, you will have a fatal error
808         my $lock_acquired = eval { flock($fh, $op) };
809         # assuming that $fh and $op are fine(..), an undef $lock_acquired
810         # means no flock
811         $use_flock = defined($lock_acquired) ? 1 : 0;
812         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
813         return 1 if !$use_flock;
814         return $lock_acquired;
815     } else {
816         return 1 if !$use_flock;
817         return flock($fh, $op);
818     }
819 }
820
821 sub _create_lockfile { #returns undef on failure
822     my $dir= shift;
823     unless (-d $dir) {
824         eval { mkpath($dir, 0, oct(755)) };
825         return if $@;
826     }
827     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
828     return ( $fh, $dir.'/'.LOCK_FILENAME );
829 }
830
831 sub print_usage {
832     print <<_USAGE_;
833 $0: reindex MARC bibs and/or authorities in Zebra.
834
835 Use this batch job to reindex all biblio or authority
836 records in your Koha database.
837
838 Parameters:
839
840     -b                      index bibliographic records
841
842     -a                      index authority records
843
844     -daemon                 Run in daemon mode.  The program will loop checking
845                             for entries on the zebraqueue table, processing
846                             them incrementally if present, and then sleep
847                             for a few seconds before repeating the process
848                             Checking the zebraqueue table is done with a cheap
849                             SQL query.  This allows for near realtime update of
850                             the zebra search index with low system overhead.
851                             Use -sleep to control the checking interval.
852
853                             Daemon mode implies -z, -a, -b.  The program will
854                             refuse to start if options are present that do not
855                             make sense while running as an incremental update
856                             daemon (e.g. -r or -offset).
857
858     -sleep 10               Seconds to sleep between checks of the zebraqueue
859                             table in daemon mode.  The default is 5 seconds.
860
861     -z                      select only updated and deleted
862                             records marked in the zebraqueue
863                             table.  Cannot be used with -r
864                             or -s.
865
866     --skip-deletes          only select record updates, not record
867                             deletions, to avoid potential excessive
868                             I/O when zebraidx processes deletions.
869                             If this option is used for normal indexing,
870                             a cronjob should be set up to run
871                             rebuild_zebra.pl -z without --skip-deletes
872                             during off hours.
873                             Only effective with -z.
874
875     -r                      clear Zebra index before
876                             adding records to index. Implies -w.
877
878     -d                      Temporary directory for indexing.
879                             If not specified, one is automatically
880                             created.  The export directory
881                             is automatically deleted unless
882                             you supply the -k switch.
883
884     -k                      Do not delete export directory.
885
886     -s                      Skip export.  Used if you have
887                             already exported the records
888                             in a previous run.
889
890     -nosanitize             export biblio/authority records directly from DB marcxml
891                             field without sanitizing records. It speed up
892                             dump process but could fail if DB contains badly
893                             encoded records. Works only with -x,
894
895     -w                      skip shadow indexing for this batch
896
897     -y                      do NOT clear zebraqueue after indexing; normally,
898                             after doing batch indexing, zebraqueue should be
899                             marked done for the affected record type(s) so that
900                             a running zebraqueue_daemon doesn't try to reindex
901                             the same records - specify -y to override this.
902                             Cannot be used with -z.
903
904     -v                      increase the amount of logging.  Normally only
905                             warnings and errors from the indexing are shown.
906                             Use log level 2 (-v -v) to include all Zebra logs.
907
908     --length   1234         how many biblio you want to export
909     --offset 1243           offset you want to start to
910                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
911                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
912     --where                 let you specify a WHERE query, like itemtype='BOOK'
913                             or something like that
914
915     --run-as-root           explicitily allow script to run as 'root' user
916
917     --wait-for-lock         when not running in daemon mode, the default
918                             behavior is to abort a rebuild if the rebuild
919                             lock is busy.  This option will cause the program
920                             to wait for the lock to free and then continue
921                             processing the rebuild request,
922
923     --table                 specify a table (can be items, biblioitems, biblio, biblio_metadata) to retrieve biblionumber to index.
924                             biblioitems is the default value.
925
926     --help or -h            show this message.
927 _USAGE_
928 }