Bug 22721: Remove frameworkcode parameter in GetMarcFromKohaField calls
[koha-equinox.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use Koha::Script;
21 use C4::Context;
22 use Getopt::Long;
23 use Fcntl qw(:flock);
24 use File::Temp qw/ tempdir /;
25 use File::Path;
26 use C4::Biblio;
27 use C4::AuthoritiesMarc;
28 use C4::Items;
29 use Koha::RecordProcessor;
30 use Koha::Caches;
31 use XML::LibXML;
32
33 use constant LOCK_FILENAME => 'rebuild..LCK';
34
35 # script that checks zebradir structure & create directories & mandatory files if needed
36 #
37 #
38
39 $|=1; # flushes output
40 # If the cron job starts us in an unreadable dir, we will break without
41 # this.
42 chdir $ENV{HOME} if (!(-r '.'));
43 my $daemon_mode;
44 my $daemon_sleep = 5;
45 my $directory;
46 my $nosanitize;
47 my $skip_export;
48 my $keep_export;
49 my $skip_index;
50 my $reset;
51 my $biblios;
52 my $authorities;
53 my $as_xml;
54 my $noshadow;
55 my $want_help;
56 my $process_zebraqueue;
57 my $process_zebraqueue_skip_deletes;
58 my $do_not_clear_zebraqueue;
59 my $length;
60 my $where;
61 my $offset;
62 my $run_as_root;
63 my $run_user = (getpwuid($<))[0];
64 my $wait_for_lock = 0;
65 my $use_flock;
66 my $table = 'biblioitems';
67 my $is_memcached = Koha::Caches->get_instance->memcached_cache;
68
69 my $verbose_logging = 0;
70 my $zebraidx_log_opt = " -v none,fatal,warn ";
71 my $result = GetOptions(
72     'daemon'        => \$daemon_mode,
73     'sleep:i'       => \$daemon_sleep,
74     'd:s'           => \$directory,
75     'r|reset'       => \$reset,
76     's'             => \$skip_export,
77     'k'             => \$keep_export,
78     'I|skip-index'  => \$skip_index,
79     'nosanitize'    => \$nosanitize,
80     'b'             => \$biblios,
81     'w'             => \$noshadow,
82     'a'             => \$authorities,
83     'h|help'        => \$want_help,
84     'x'             => \$as_xml,
85     'y'             => \$do_not_clear_zebraqueue,
86     'z'             => \$process_zebraqueue,
87     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
88     'where:s'       => \$where,
89     'length:i'      => \$length,
90     'offset:i'      => \$offset,
91     'v+'            => \$verbose_logging,
92     'run-as-root'   => \$run_as_root,
93     'wait-for-lock' => \$wait_for_lock,
94     't|table:s'     => \$table,
95 );
96
97 if (not $result or $want_help) {
98     print_usage();
99     exit 0;
100 }
101
102 if ( $as_xml ) {
103     warn "Warning: You passed -x which is already the default and is now deprecated\n";
104     undef $as_xml; # Should not be used later
105 }
106
107 if( not defined $run_as_root and $run_user eq 'root') {
108     my $msg = "Warning: You are running this script as the user 'root'.\n";
109     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
110     $msg   .= "Please do '$0 --help' to see usage.\n";
111     die $msg;
112 }
113
114 if ($process_zebraqueue and ($skip_export or $reset)) {
115     my $msg = "Cannot specify -r or -s if -z is specified\n";
116     $msg   .= "Please do '$0 --help' to see usage.\n";
117     die $msg;
118 }
119
120 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
121     my $msg = "Cannot specify both -y and -z\n";
122     $msg   .= "Please do '$0 --help' to see usage.\n";
123     die $msg;
124 }
125
126 if ($daemon_mode) {
127     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
128     if ($skip_export or $keep_export or $skip_index or
129           $where or $length or $offset) {
130         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
131         $msg   .= "Please do '$0 --help' to see usage.\n";
132         die $msg;
133     }
134     unless ($is_memcached) {
135         warn "Warning: script running in daemon mode, without recommended caching system (memcached).\n";
136     }
137     $authorities = 1;
138     $biblios = 1;
139     $process_zebraqueue = 1;
140 }
141
142 if (not $biblios and not $authorities) {
143     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
144     $msg   .= "Please do '$0 --help' to see usage.\n";
145     die $msg;
146 }
147
148 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio', 'biblio_metadata' );
149 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
150     die "Cannot specify -t|--table with value '$table'. Only "
151       . ( join ', ', @tables_allowed_for_select )
152       . " are allowed.";
153 }
154
155
156 #  -v is for verbose, which seems backwards here because of how logging is set
157 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
158 if ($verbose_logging >= 2) {
159     $zebraidx_log_opt = '-v none,fatal,warn,all';
160 }
161
162 my $use_tempdir = 0;
163 unless ($directory) {
164     $use_tempdir = 1;
165     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
166 }
167
168
169 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
170 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
171
172 my $kohadir = C4::Context->config('intranetdir');
173
174 my ($biblionumbertagfield,$biblionumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblio.biblionumber" );
175 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblioitems.biblioitemnumber" );
176
177 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
178 <collection xmlns="http://www.loc.gov/MARC21/slim">
179 };
180
181 my $marcxml_close = q{
182 </collection>
183 };
184
185 # Protect again simultaneous update of the zebra index by using a lock file.
186 # Create our own lock directory if it is missing. This should be created
187 # by koha-zebra-ctl.sh or at system installation. If the desired directory
188 # does not exist and cannot be created, we fall back on /tmp - which will
189 # always work.
190
191 my ($lockfile, $LockFH);
192 foreach (
193     C4::Context->config("zebra_lockdir"),
194     '/var/lock/zebra_' . C4::Context->config('database'),
195     '/tmp/zebra_' . C4::Context->config('database')
196 ) {
197     #we try three possibilities (we really want to lock :)
198     next if !$_;
199     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
200     last if defined $LockFH;
201 }
202 if( !defined $LockFH ) {
203     print "WARNING: Could not create lock file $lockfile: $!\n";
204     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
205     print "Verify file permissions for it too.\n";
206     $use_flock = 0; # we disable file locking now and will continue
207                     # without it
208                     # note that this mimics old behavior (before we used
209                     # the lockfile)
210 };
211
212 if ( $verbose_logging ) {
213     print "Zebra configuration information\n";
214     print "================================\n";
215     print "Zebra biblio directory      = $biblioserverdir\n";
216     print "Zebra authorities directory = $authorityserverdir\n";
217     print "Koha directory              = $kohadir\n";
218     print "Lockfile                    = $lockfile\n" if $lockfile;
219     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
220     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
221     print "================================\n";
222 }
223
224 my $tester = XML::LibXML->new();
225 my $dbh;
226
227 # The main work is done here by calling do_one_pass().  We have added locking
228 # avoid race conditions between full rebuilds and incremental updates either from
229 # daemon mode or periodic invocation from cron.  The race can lead to an updated
230 # record being overwritten by a rebuild if the update is applied after the export
231 # by the rebuild and before the rebuild finishes (more likely to affect large
232 # catalogs).
233 #
234 # We have chosen to exit immediately by default if we cannot obtain the lock
235 # to prevent the potential for a infinite backlog from cron invocations, but an
236 # option (wait-for-lock) is provided to let the program wait for the lock.
237 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
238 if ($daemon_mode) {
239     while (1) {
240         # For incremental updates, skip the update if the updates are locked
241         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
242             eval {
243                 $dbh = C4::Context->dbh;
244                 if( zebraqueue_not_empty() ) {
245                     Koha::Caches->flush_L1_caches() if $is_memcached;
246                     do_one_pass();
247                 }
248             };
249             if ($@ && $verbose_logging) {
250                 warn "Warning : $@\n";
251             }
252             _flock($LockFH, LOCK_UN);
253         }
254         sleep $daemon_sleep;
255     }
256 } else {
257     # all one-off invocations
258     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
259     if (_flock($LockFH, $lock_mode)) {
260         $dbh = C4::Context->dbh;
261         do_one_pass();
262         _flock($LockFH, LOCK_UN);
263     } else {
264         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
265     }
266 }
267
268
269 if ( $verbose_logging ) {
270     print "====================\n";
271     print "CLEANING\n";
272     print "====================\n";
273 }
274 if ($keep_export) {
275     print "NOTHING cleaned : the export $directory has been kept.\n";
276     print "You can re-run this script with the -s ";
277     if ($use_tempdir) {
278         print " and -d $directory parameters";
279     } else {
280         print "parameter";
281     }
282     print "\n";
283     print "if you just want to rebuild zebra after changing zebra config files\n";
284 } else {
285     unless ($use_tempdir) {
286         # if we're using a temporary directory
287         # created by File::Temp, it will be removed
288         # automatically.
289         rmtree($directory, 0, 1);
290         print "directory $directory deleted\n";
291     }
292 }
293
294 sub do_one_pass {
295     if ($authorities) {
296         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
297     } else {
298         print "skipping authorities\n" if ( $verbose_logging );
299     }
300
301     if ($biblios) {
302         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
303     } else {
304         print "skipping biblios\n" if ( $verbose_logging );
305     }
306 }
307
308 # Check the zebra update queue and return true if there are records to process
309 # This routine will handle each of -ab, -a, or -b, but in practice we force
310 # -ab when in daemon mode.
311 sub zebraqueue_not_empty {
312     my $where_str;
313
314     if ($authorities && $biblios) {
315         $where_str = 'done = 0;';
316     } elsif ($biblios) {
317         $where_str = 'server = "biblioserver" AND done = 0;';
318     } else {
319         $where_str = 'server = "authorityserver" AND done = 0;';
320     }
321     my $query =
322         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
323
324     $query->execute;
325     my $count = $query->fetchrow_arrayref->[0];
326     print "queued records: $count\n" if $verbose_logging > 0;
327     return $count > 0;
328 }
329
330 # This checks to see if the zebra directories exist under the provided path.
331 # If they don't, then zebra is likely to spit the dummy. This returns true
332 # if the directories had to be created, false otherwise.
333 sub check_zebra_dirs {
334     my ($base) = shift() . '/';
335     my $needed_repairing = 0;
336     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
337     foreach my $dir (@dirs) {
338         my $bdir = $base . $dir;
339         if (! -d $bdir) {
340             $needed_repairing = 1;
341             mkdir $bdir || die "Unable to create '$bdir': $!\n";
342             print "$0: needed to create '$bdir'\n";
343         }
344     }
345     return $needed_repairing;
346 }   # ----------  end of subroutine check_zebra_dirs  ----------
347
348 sub index_records {
349     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
350
351     my $num_records_exported = 0;
352     my $records_deleted = {};
353     my $need_reset = check_zebra_dirs($server_dir);
354     if ($need_reset) {
355         print "$0: found broken zebra server directories: forcing a rebuild\n";
356         $reset = 1;
357     }
358     if ($skip_export && $verbose_logging) {
359         print "====================\n";
360         print "SKIPPING $record_type export\n";
361         print "====================\n";
362     } else {
363         if ( $verbose_logging ) {
364             print "====================\n";
365             print "exporting $record_type\n";
366             print "====================\n";
367         }
368         mkdir "$directory" unless (-d $directory);
369         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
370         if ($process_zebraqueue) {
371             my $entries;
372
373             unless ( $process_zebraqueue_skip_deletes ) {
374                 $entries = select_zebraqueue_records($record_type, 'deleted');
375                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
376                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
377                 mark_zebraqueue_batch_done($entries);
378             }
379
380             $entries = select_zebraqueue_records($record_type, 'updated');
381             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
382             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
383             mark_zebraqueue_batch_done($entries);
384
385         } else {
386             my $sth = select_all_records($record_type);
387             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
388             unless ($do_not_clear_zebraqueue) {
389                 mark_all_zebraqueue_done($record_type);
390             }
391         }
392     }
393
394     #
395     # and reindexing everything
396     #
397     if ($skip_index) {
398         if ($verbose_logging) {
399             print "====================\n";
400             print "SKIPPING $record_type indexing\n";
401             print "====================\n";
402         }
403     } else {
404         if ( $verbose_logging ) {
405             print "====================\n";
406             print "REINDEXING zebra\n";
407             print "====================\n";
408         }
409         my $record_fmt = 'marcxml';
410         if ($process_zebraqueue) {
411             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
412                 if %$records_deleted;
413             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
414                 if $num_records_exported;
415         } else {
416             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
417                 if ($num_records_exported or $skip_export);
418         }
419     }
420 }
421
422
423 sub select_zebraqueue_records {
424     my ($record_type, $update_type) = @_;
425
426     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
427     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
428
429     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
430                              FROM zebraqueue
431                              WHERE server = ?
432                              AND   operation = ?
433                              AND   done = 0
434                              ORDER BY id DESC");
435     $sth->execute($server, $op);
436     my $entries = $sth->fetchall_arrayref({});
437 }
438
439 sub mark_all_zebraqueue_done {
440     my ($record_type) = @_;
441
442     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
443
444     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
445                              WHERE server = ?
446                              AND done = 0");
447     $sth->execute($server);
448 }
449
450 sub mark_zebraqueue_batch_done {
451     my ($entries) = @_;
452
453     $dbh->{AutoCommit} = 0;
454     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
455     $dbh->commit();
456     foreach my $id (map { $_->{id} } @$entries) {
457         $sth->execute($id);
458     }
459     $dbh->{AutoCommit} = 1;
460 }
461
462 sub select_all_records {
463     my $record_type = shift;
464     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
465 }
466
467 sub select_all_authorities {
468     my $strsth=qq{SELECT authid FROM auth_header};
469     $strsth.=qq{ WHERE $where } if ($where);
470     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
471     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
472     my $sth = $dbh->prepare($strsth);
473     $sth->execute();
474     return $sth;
475 }
476
477 sub select_all_biblios {
478     $table = 'biblioitems'
479       unless grep { /^$table$/ } @tables_allowed_for_select;
480     my $strsth = qq{ SELECT DISTINCT biblionumber FROM $table };
481     $strsth.=qq{ WHERE $where } if ($where);
482     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
483     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
484     my $sth = $dbh->prepare($strsth);
485     $sth->execute();
486     return $sth;
487 }
488
489 sub export_marc_records_from_sth {
490     my ($record_type, $sth, $directory, $nosanitize) = @_;
491
492     my $num_exported = 0;
493     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
494
495     print {$fh} $marcxml_open;
496
497     my $i = 0;
498     my ( $itemtag, $itemsubfield ) = C4::Biblio::GetMarcFromKohaField( "items.itemnumber" );
499     while (my ($record_number) = $sth->fetchrow_array) {
500         print "." if ( $verbose_logging );
501         print "\r$i" unless ($i++ %100 or !$verbose_logging);
502         if ( $nosanitize ) {
503             my $marcxml = $record_type eq 'biblio'
504                           ? GetXmlBiblio( $record_number )
505                           : GetAuthorityXML( $record_number );
506             if ($record_type eq 'biblio'){
507                 my @items = GetItemsInfo($record_number);
508                 if (@items){
509                     my $record = MARC::Record->new;
510                     $record->encoding('UTF-8');
511                     my @itemsrecord;
512                     foreach my $item (@items){
513                         my $record = Item2Marc($item, $record_number);
514                         push @itemsrecord, $record->field($itemtag);
515                     }
516                     $record->insert_fields_ordered(@itemsrecord);
517                     my $itemsxml = $record->as_xml_record();
518                     $marcxml =
519                         substr($marcxml, 0, length($marcxml)-10) .
520                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
521                 }
522             }
523             # extra test to ensure that result is valid XML; otherwise
524             # Zebra won't parse it in DOM mode
525             eval {
526                 my $doc = $tester->parse_string($marcxml);
527             };
528             if ($@) {
529                 warn "Error exporting record $record_number ($record_type): $@\n";
530                 next;
531             }
532             if ( $marcxml ) {
533                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
534                 print {$fh} $marcxml;
535                 $num_exported++;
536             }
537             next;
538         }
539         my ($marc) = get_corrected_marc_record($record_type, $record_number);
540         if (defined $marc) {
541             eval {
542                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
543                 eval {
544                     my $doc = $tester->parse_string($rec);
545                 };
546                 if ($@) {
547                     die "invalid XML: $@";
548                 }
549                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
550                 print {$fh} $rec;
551                 $num_exported++;
552             };
553             if ($@) {
554                 warn "Error exporting record $record_number ($record_type) XML";
555                 warn "... specific error is $@" if $verbose_logging;
556             }
557         }
558     }
559     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
560     print {$fh} $marcxml_close;
561
562     close $fh;
563     return $num_exported;
564 }
565
566 sub export_marc_records_from_list {
567     my ($record_type, $entries, $directory, $records_deleted) = @_;
568
569     my $num_exported = 0;
570     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
571
572     print {$fh} $marcxml_open;
573
574     my $i = 0;
575
576     # Skip any deleted records. We check for this anyway, but this reduces error spam
577     my %found = %$records_deleted;
578     foreach my $record_number ( map { $_->{biblio_auth_number} }
579                                 grep { !$found{ $_->{biblio_auth_number} }++ }
580                                 @$entries ) {
581         print "." if ( $verbose_logging );
582         print "\r$i" unless ($i++ %100 or !$verbose_logging);
583         my ($marc) = get_corrected_marc_record($record_type, $record_number);
584         if (defined $marc) {
585             eval {
586                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
587                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
588                 print {$fh} $rec;
589                 $num_exported++;
590             };
591             if ($@) {
592               warn "Error exporting record $record_number ($record_type) XML";
593             }
594         }
595     }
596     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
597
598     print {$fh} $marcxml_close;
599
600     close $fh;
601     return $num_exported;
602 }
603
604 sub generate_deleted_marc_records {
605
606     my ($record_type, $entries, $directory) = @_;
607
608     my $records_deleted = {};
609     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
610
611     print {$fh} $marcxml_open;
612
613     my $i = 0;
614     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
615         print "\r$i" unless ($i++ %100 or !$verbose_logging);
616         print "." if ( $verbose_logging );
617
618         my $marc = MARC::Record->new();
619         if ($record_type eq 'biblio') {
620             fix_biblio_ids($marc, $record_number, $record_number);
621         } else {
622             fix_authority_id($marc, $record_number);
623         }
624         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
625             fix_unimarc_100($marc);
626         }
627
628         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
629         # Remove the record's XML header
630         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
631         print {$fh} $rec;
632
633         $records_deleted->{$record_number} = 1;
634     }
635     print "\nRecords exported: $i\n" if ( $verbose_logging );
636
637     print {$fh} $marcxml_close;
638
639     close $fh;
640     return $records_deleted;
641 }
642
643 sub get_corrected_marc_record {
644     my ( $record_type, $record_number ) = @_;
645
646     my $marc = get_raw_marc_record( $record_type, $record_number );
647
648     if ( defined $marc ) {
649         fix_leader($marc);
650         if ( $record_type eq 'authority' ) {
651             fix_authority_id( $marc, $record_number );
652         }
653         elsif ( $record_type eq 'biblio' ) {
654
655             my @filters;
656             push @filters, 'EmbedItemsAvailability';
657             push @filters, 'EmbedSeeFromHeadings'
658                 if C4::Context->preference('IncludeSeeFromInSearches');
659
660             my $normalizer = Koha::RecordProcessor->new( { filters => \@filters } );
661             $marc = $normalizer->process($marc);
662         }
663         if ( C4::Context->preference("marcflavour") eq "UNIMARC" ) {
664             fix_unimarc_100($marc);
665         }
666     }
667
668     return $marc;
669 }
670
671 sub get_raw_marc_record {
672     my ($record_type, $record_number) = @_;
673
674     my $marc;
675     if ($record_type eq 'biblio') {
676         eval { $marc = C4::Biblio::GetMarcBiblio({ biblionumber => $record_number, embed_items => 1 }); };
677         if ($@ || !$marc) {
678             # here we do warn since catching an exception
679             # means that the bib was found but failed
680             # to be parsed
681             warn "error retrieving biblio $record_number";
682             return;
683         }
684     } else {
685         eval { $marc = GetAuthority($record_number); };
686         if ($@) {
687             warn "error retrieving authority $record_number";
688             return;
689         }
690     }
691     return $marc;
692 }
693
694 sub fix_leader {
695     # FIXME - this routine is suspect
696     # It blanks the Leader/00-05 and Leader/12-16 to
697     # force them to be recalculated correct when
698     # the $marc->as_usmarc() or $marc->as_xml() is called.
699     # But why is this necessary?  It would be a serious bug
700     # in MARC::Record (definitely) and MARC::File::XML (arguably)
701     # if they are emitting incorrect leader values.
702     my $marc = shift;
703
704     my $leader = $marc->leader;
705     substr($leader,  0, 5) = '     ';
706     substr($leader, 10, 7) = '22     ';
707     $marc->leader(substr($leader, 0, 24));
708 }
709
710 sub fix_biblio_ids {
711     # FIXME - it is essential to ensure that the biblionumber is present,
712     #         otherwise, Zebra will choke on the record.  However, this
713     #         logic belongs in the relevant C4::Biblio APIs.
714     my $marc = shift;
715     my $biblionumber = shift;
716     my $biblioitemnumber;
717     if (@_) {
718         $biblioitemnumber = shift;
719     } else {
720         my $sth = $dbh->prepare(
721             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
722         $sth->execute($biblionumber);
723         ($biblioitemnumber) = $sth->fetchrow_array;
724         $sth->finish;
725         unless ($biblioitemnumber) {
726             warn "failed to get biblioitemnumber for biblio $biblionumber";
727             return 0;
728         }
729     }
730
731     # FIXME - this is cheating on two levels
732     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
733     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
734     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
735     #
736     # On the other hand, this better for now than what rebuild_zebra.pl used to
737     # do, which was duplicate the code for inserting the biblionumber
738     # and biblioitemnumber
739     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
740
741     return 1;
742 }
743
744 sub fix_authority_id {
745     # FIXME - as with fix_biblio_ids, the authid must be present
746     #         for Zebra's sake.  However, this really belongs
747     #         in C4::AuthoritiesMarc.
748     my ($marc, $authid) = @_;
749     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
750         $marc->delete_field($marc->field('001'));
751         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
752     }
753 }
754
755 sub fix_unimarc_100 {
756     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
757     my $marc = shift;
758
759     my $string;
760     my $length_100a = length($marc->subfield( 100, "a" ));
761     if (  $length_100a and $length_100a == 36 ) {
762         $string = $marc->subfield( 100, "a" );
763         my $f100 = $marc->field(100);
764         $marc->delete_field($f100);
765     }
766     else {
767         $string = POSIX::strftime( "%Y%m%d", localtime );
768         $string =~ s/\-//g;
769         $string = sprintf( "%-*s", 35, $string );
770     }
771     substr( $string, 22, 6, "frey50" );
772     $length_100a = length($marc->subfield( 100, "a" ));
773     unless ( $length_100a and $length_100a == 36 ) {
774         $marc->delete_field($marc->field(100));
775         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
776     }
777 }
778
779 sub do_indexing {
780     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
781
782     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
783     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
784     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
785     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
786
787     $noshadow //= '';
788
789     if ($noshadow or $reset_index) {
790         $noshadow = '-n';
791     }
792
793     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
794     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
795     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
796 }
797
798 sub _flock {
799     # test if flock is present; if so, use it; if not, return true
800     # op refers to the official flock operations including LOCK_EX,
801     # LOCK_UN, etc.
802     # combining LOCK_EX with LOCK_NB returns immediately
803     my ($fh, $op)= @_;
804     if( !defined($use_flock) ) {
805         #check if flock is present; if not, you will have a fatal error
806         my $lock_acquired = eval { flock($fh, $op) };
807         # assuming that $fh and $op are fine(..), an undef $lock_acquired
808         # means no flock
809         $use_flock = defined($lock_acquired) ? 1 : 0;
810         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
811         return 1 if !$use_flock;
812         return $lock_acquired;
813     } else {
814         return 1 if !$use_flock;
815         return flock($fh, $op);
816     }
817 }
818
819 sub _create_lockfile { #returns undef on failure
820     my $dir= shift;
821     unless (-d $dir) {
822         eval { mkpath($dir, 0, oct(755)) };
823         return if $@;
824     }
825     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
826     return ( $fh, $dir.'/'.LOCK_FILENAME );
827 }
828
829 sub print_usage {
830     print <<_USAGE_;
831 $0: reindex MARC bibs and/or authorities in Zebra.
832
833 Use this batch job to reindex all biblio or authority
834 records in your Koha database.
835
836 Parameters:
837
838     -b                      index bibliographic records
839
840     -a                      index authority records
841
842     -daemon                 Run in daemon mode.  The program will loop checking
843                             for entries on the zebraqueue table, processing
844                             them incrementally if present, and then sleep
845                             for a few seconds before repeating the process
846                             Checking the zebraqueue table is done with a cheap
847                             SQL query.  This allows for near realtime update of
848                             the zebra search index with low system overhead.
849                             Use -sleep to control the checking interval.
850
851                             Daemon mode implies -z, -a, -b.  The program will
852                             refuse to start if options are present that do not
853                             make sense while running as an incremental update
854                             daemon (e.g. -r or -offset).
855
856     -sleep 10               Seconds to sleep between checks of the zebraqueue
857                             table in daemon mode.  The default is 5 seconds.
858
859     -z                      select only updated and deleted
860                             records marked in the zebraqueue
861                             table.  Cannot be used with -r
862                             or -s.
863
864     --skip-deletes          only select record updates, not record
865                             deletions, to avoid potential excessive
866                             I/O when zebraidx processes deletions.
867                             If this option is used for normal indexing,
868                             a cronjob should be set up to run
869                             rebuild_zebra.pl -z without --skip-deletes
870                             during off hours.
871                             Only effective with -z.
872
873     -r                      clear Zebra index before
874                             adding records to index. Implies -w.
875
876     -d                      Temporary directory for indexing.
877                             If not specified, one is automatically
878                             created.  The export directory
879                             is automatically deleted unless
880                             you supply the -k switch.
881
882     -k                      Do not delete export directory.
883
884     -s                      Skip export.  Used if you have
885                             already exported the records
886                             in a previous run.
887
888     -nosanitize             export biblio/authority records directly from DB marcxml
889                             field without sanitizing records. It speed up
890                             dump process but could fail if DB contains badly
891                             encoded records. Works only with -x,
892
893     -w                      skip shadow indexing for this batch
894
895     -y                      do NOT clear zebraqueue after indexing; normally,
896                             after doing batch indexing, zebraqueue should be
897                             marked done for the affected record type(s) so that
898                             a running zebraqueue_daemon doesn't try to reindex
899                             the same records - specify -y to override this.
900                             Cannot be used with -z.
901
902     -v                      increase the amount of logging.  Normally only
903                             warnings and errors from the indexing are shown.
904                             Use log level 2 (-v -v) to include all Zebra logs.
905
906     --length   1234         how many biblio you want to export
907     --offset 1243           offset you want to start to
908                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
909                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
910     --where                 let you specify a WHERE query, like itemtype='BOOK'
911                             or something like that
912
913     --run-as-root           explicitily allow script to run as 'root' user
914
915     --wait-for-lock         when not running in daemon mode, the default
916                             behavior is to abort a rebuild if the rebuild
917                             lock is busy.  This option will cause the program
918                             to wait for the lock to free and then continue
919                             processing the rebuild request,
920
921     --table                 specify a table (can be items, biblioitems, biblio, biblio_metadata) to retrieve biblionumber to index.
922                             biblioitems is the default value.
923
924     --help or -h            show this message.
925 _USAGE_
926 }