04cf45428dfb77055fc1b30cd3b4ae0c5bb866e5
[koha-equinox.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use File::Temp qw/ tempdir /;
9 use File::Path;
10 use C4::Biblio;
11 use C4::AuthoritiesMarc;
12 use C4::Items;
13 use Koha::RecordProcessor;
14 use XML::LibXML;
15
16 # script that checks zebradir structure & create directories & mandatory files if needed
17 #
18 #
19
20 $|=1; # flushes output
21 # If the cron job starts us in an unreadable dir, we will break without
22 # this.
23 chdir $ENV{HOME} if (!(-r '.'));
24 my $daemon_mode;
25 my $daemon_sleep = 5;
26 my $directory;
27 my $nosanitize;
28 my $skip_export;
29 my $keep_export;
30 my $skip_index;
31 my $reset;
32 my $biblios;
33 my $authorities;
34 my $noxml;
35 my $noshadow;
36 my $want_help;
37 my $as_xml;
38 my $process_zebraqueue;
39 my $do_not_clear_zebraqueue;
40 my $length;
41 my $where;
42 my $offset;
43 my $run_as_root;
44 my $run_user = (getpwuid($<))[0];
45
46 my $verbose_logging = 0;
47 my $zebraidx_log_opt = " -v none,fatal,warn ";
48 my $result = GetOptions(
49     'daemon'        => \$daemon_mode,
50     'sleep:i'       => \$daemon_sleep,
51     'd:s'           => \$directory,
52     'r|reset'       => \$reset,
53     's'             => \$skip_export,
54     'k'             => \$keep_export,
55     'I|skip-index'  => \$skip_index,
56     'nosanitize'    => \$nosanitize,
57     'b'             => \$biblios,
58     'noxml'         => \$noxml,
59     'w'             => \$noshadow,
60     'a'             => \$authorities,
61     'h|help'        => \$want_help,
62     'x'             => \$as_xml,
63     'y'             => \$do_not_clear_zebraqueue,
64     'z'             => \$process_zebraqueue,
65     'where:s'        => \$where,
66     'length:i'        => \$length,
67     'offset:i'      => \$offset,
68     'v+'             => \$verbose_logging,
69     'run-as-root'    => \$run_as_root,
70 );
71
72 if (not $result or $want_help) {
73     print_usage();
74     exit 0;
75 }
76
77 if( not defined $run_as_root and $run_user eq 'root') {
78     my $msg = "Warning: You are running this script as the user 'root'.\n";
79     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
80     $msg   .= "Please do '$0 --help' to see usage.\n";
81     die $msg;
82 }
83
84 if ( !$as_xml and $nosanitize ) {
85     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
86     $msg   .= "Please do '$0 --help' to see usage.\n";
87     die $msg;
88 }
89
90 if ($process_zebraqueue and ($skip_export or $reset)) {
91     my $msg = "Cannot specify -r or -s if -z is specified\n";
92     $msg   .= "Please do '$0 --help' to see usage.\n";
93     die $msg;
94 }
95
96 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
97     my $msg = "Cannot specify both -y and -z\n";
98     $msg   .= "Please do '$0 --help' to see usage.\n";
99     die $msg;
100 }
101
102 if ($reset) {
103     $noshadow = 1;
104 }
105
106 if ($noshadow) {
107     $noshadow = ' -n ';
108 }
109
110 if ($daemon_mode) {
111     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
112     if ($skip_export or $keep_export or $skip_index or
113           $where or $length or $offset) {
114         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
115         $msg   .= "Please do '$0 --help' to see usage.\n";
116         die $msg;
117     }
118     $authorities = 1;
119     $biblios = 1;
120     $process_zebraqueue = 1;
121 }
122
123 if (not $biblios and not $authorities) {
124     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
125     $msg   .= "Please do '$0 --help' to see usage.\n";
126     die $msg;
127 }
128
129
130 #  -v is for verbose, which seems backwards here because of how logging is set
131 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
132 if ($verbose_logging >= 2) {
133     $zebraidx_log_opt = '-v none,fatal,warn,all';
134 }
135
136 my $use_tempdir = 0;
137 unless ($directory) {
138     $use_tempdir = 1;
139     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
140 }
141
142
143 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
144 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
145
146 my $kohadir = C4::Context->config('intranetdir');
147 my $bib_index_mode = C4::Context->config('zebra_bib_index_mode') || 'grs1';
148 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') || 'dom';
149
150 my $dbh = C4::Context->dbh;
151 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
152 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
153
154 if ( $verbose_logging ) {
155     print "Zebra configuration information\n";
156     print "================================\n";
157     print "Zebra biblio directory      = $biblioserverdir\n";
158     print "Zebra authorities directory = $authorityserverdir\n";
159     print "Koha directory              = $kohadir\n";
160     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
161     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
162     print "================================\n";
163 }
164
165 my $tester = XML::LibXML->new();
166
167 if ($daemon_mode) {
168     while (1) {
169         do_one_pass() if ( zebraqueue_not_empty() );
170         sleep $daemon_sleep;
171     }
172 } else {
173     do_one_pass();
174 }
175
176
177 if ( $verbose_logging ) {
178     print "====================\n";
179     print "CLEANING\n";
180     print "====================\n";
181 }
182 if ($keep_export) {
183     print "NOTHING cleaned : the export $directory has been kept.\n";
184     print "You can re-run this script with the -s ";
185     if ($use_tempdir) {
186         print " and -d $directory parameters";
187     } else {
188         print "parameter";
189     }
190     print "\n";
191     print "if you just want to rebuild zebra after changing the record.abs\n";
192     print "or another zebra config file\n";
193 } else {
194     unless ($use_tempdir) {
195         # if we're using a temporary directory
196         # created by File::Temp, it will be removed
197         # automatically.
198         rmtree($directory, 0, 1);
199         print "directory $directory deleted\n";
200     }
201 }
202
203 sub do_one_pass {
204     if ($authorities) {
205         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
206     } else {
207         print "skipping authorities\n" if ( $verbose_logging );
208     }
209
210     if ($biblios) {
211         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
212     } else {
213         print "skipping biblios\n" if ( $verbose_logging );
214     }
215 }
216
217 # Check the zebra update queue and return true if there are records to process
218 # This routine will handle each of -ab, -a, or -b, but in practice we force
219 # -ab when in daemon mode.
220 sub zebraqueue_not_empty {
221     my $where_str;
222
223     if ($authorities && $biblios) {
224         $where_str = 'done = 0;';
225     } elsif ($biblios) {
226         $where_str = 'server = "biblioserver" AND done = 0;';
227     } else {
228         $where_str = 'server = "authorityserver" AND done = 0;';
229     }
230     my $query =
231       $dbh->prepare( 'SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
232
233     $query->execute;
234     my $count = $query->fetchrow_arrayref->[0];
235     print "queued records: $count\n" if $verbose_logging > 0;
236     return $count > 0;
237 }
238
239 # This checks to see if the zebra directories exist under the provided path.
240 # If they don't, then zebra is likely to spit the dummy. This returns true
241 # if the directories had to be created, false otherwise.
242 sub check_zebra_dirs {
243     my ($base) = shift() . '/';
244     my $needed_repairing = 0;
245     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
246     foreach my $dir (@dirs) {
247         my $bdir = $base . $dir;
248         if (! -d $bdir) {
249             $needed_repairing = 1;
250             mkdir $bdir || die "Unable to create '$bdir': $!\n";
251             print "$0: needed to create '$bdir'\n";
252         }
253     }
254     return $needed_repairing;
255 }   # ----------  end of subroutine check_zebra_dirs  ----------
256
257 sub index_records {
258     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
259
260     my $num_records_exported = 0;
261     my $records_deleted;
262     my $need_reset = check_zebra_dirs($server_dir);
263     if ($need_reset) {
264         print "$0: found broken zebra server directories: forcing a rebuild\n";
265         $reset = 1;
266     }
267     if ($skip_export && $verbose_logging) {
268         print "====================\n";
269         print "SKIPPING $record_type export\n";
270         print "====================\n";
271     } else {
272         if ( $verbose_logging ) {
273             print "====================\n";
274             print "exporting $record_type\n";
275             print "====================\n";
276         }
277         mkdir "$directory" unless (-d $directory);
278         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
279         if ($process_zebraqueue) {
280             my $entries = select_zebraqueue_records($record_type, 'deleted');
281             mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
282             $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
283             mark_zebraqueue_batch_done($entries);
284             $entries = select_zebraqueue_records($record_type, 'updated');
285             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
286             $num_records_exported = export_marc_records_from_list($record_type,
287                                                                   $entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
288             mark_zebraqueue_batch_done($entries);
289         } else {
290             my $sth = select_all_records($record_type);
291             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
292             unless ($do_not_clear_zebraqueue) {
293                 mark_all_zebraqueue_done($record_type);
294             }
295         }
296     }
297
298     #
299     # and reindexing everything
300     #
301     if ($skip_index) {
302         if ($verbose_logging) {
303             print "====================\n";
304             print "SKIPPING $record_type indexing\n";
305             print "====================\n";
306         }
307     } else {
308         if ( $verbose_logging ) {
309             print "====================\n";
310             print "REINDEXING zebra\n";
311             print "====================\n";
312         }
313         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
314         if ($process_zebraqueue) {
315             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
316                 if %$records_deleted;
317             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
318                 if $num_records_exported;
319         } else {
320             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
321                 if ($num_records_exported or $skip_export);
322         }
323     }
324 }
325
326
327 sub select_zebraqueue_records {
328     my ($record_type, $update_type) = @_;
329
330     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
331     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
332
333     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
334                              FROM zebraqueue
335                              WHERE server = ?
336                              AND   operation = ?
337                              AND   done = 0
338                              ORDER BY id DESC");
339     $sth->execute($server, $op);
340     my $entries = $sth->fetchall_arrayref({});
341 }
342
343 sub mark_all_zebraqueue_done {
344     my ($record_type) = @_;
345
346     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
347
348     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
349                              WHERE server = ?
350                              AND done = 0");
351     $sth->execute($server);
352 }
353
354 sub mark_zebraqueue_batch_done {
355     my ($entries) = @_;
356
357     $dbh->{AutoCommit} = 0;
358     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
359     $dbh->commit();
360     foreach my $id (map { $_->{id} } @$entries) {
361         $sth->execute($id);
362     }
363     $dbh->{AutoCommit} = 1;
364 }
365
366 sub select_all_records {
367     my $record_type = shift;
368     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
369 }
370
371 sub select_all_authorities {
372     my $strsth=qq{SELECT authid FROM auth_header};
373     $strsth.=qq{ WHERE $where } if ($where);
374     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
375     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
376     my $sth = $dbh->prepare($strsth);
377     $sth->execute();
378     return $sth;
379 }
380
381 sub select_all_biblios {
382     my $strsth = qq{ SELECT biblionumber FROM biblioitems };
383     $strsth.=qq{ WHERE $where } if ($where);
384     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
385     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
386     my $sth = $dbh->prepare($strsth);
387     $sth->execute();
388     return $sth;
389 }
390
391 sub include_xml_wrapper {
392     my $as_xml = shift;
393     my $record_type = shift;
394
395     return 0 unless $as_xml;
396     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
397     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
398     return 0;
399
400 }
401
402 sub export_marc_records_from_sth {
403     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
404
405     my $num_exported = 0;
406     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
407     if (include_xml_wrapper($as_xml, $record_type)) {
408         # include XML declaration and root element
409         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
410     }
411     my $i = 0;
412     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
413     while (my ($record_number) = $sth->fetchrow_array) {
414         print "." if ( $verbose_logging );
415         print "\r$i" unless ($i++ %100 or !$verbose_logging);
416         if ( $nosanitize ) {
417             my $marcxml = $record_type eq 'biblio'
418                           ? GetXmlBiblio( $record_number )
419                           : GetAuthorityXML( $record_number );
420             if ($record_type eq 'biblio'){
421                 my @items = GetItemsInfo($record_number);
422                 if (@items){
423                     my $record = MARC::Record->new;
424                     $record->encoding('UTF-8');
425                     my @itemsrecord;
426                     foreach my $item (@items){
427                         my $record = Item2Marc($item, $record_number);
428                         push @itemsrecord, $record->field($itemtag);
429                     }
430                     $record->insert_fields_ordered(@itemsrecord);
431                     my $itemsxml = $record->as_xml_record();
432                     $marcxml =
433                         substr($marcxml, 0, length($marcxml)-10) .
434                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
435                 }
436             }
437             # extra test to ensure that result is valid XML; otherwise
438             # Zebra won't parse it in DOM mode
439             eval {
440                 my $doc = $tester->parse_string($marcxml);
441             };
442             if ($@) {
443                 warn "Error exporting record $record_number ($record_type): $@\n";
444                 next;
445             }
446             if ( $marcxml ) {
447                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
448                 print {$fh} $marcxml;
449                 $num_exported++;
450             }
451             next;
452         }
453         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
454         if (defined $marc) {
455             eval {
456                 my $rec;
457                 if ($as_xml) {
458                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
459                     eval {
460                         my $doc = $tester->parse_string($rec);
461                     };
462                     if ($@) {
463                         die "invalid XML: $@";
464                     }
465                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
466                 } else {
467                     $rec = $marc->as_usmarc();
468                 }
469                 print {$fh} $rec;
470                 $num_exported++;
471             };
472             if ($@) {
473                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
474                 warn "... specific error is $@" if $verbose_logging;
475             }
476         }
477     }
478     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
479     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
480     close $fh;
481     return $num_exported;
482 }
483
484 sub export_marc_records_from_list {
485     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
486
487     my $num_exported = 0;
488     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
489     if (include_xml_wrapper($as_xml, $record_type)) {
490         # include XML declaration and root element
491         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
492     }
493     my $i = 0;
494
495     # Skip any deleted records. We check for this anyway, but this reduces error spam
496     my %found = %$records_deleted;
497     foreach my $record_number ( map { $_->{biblio_auth_number} }
498                                 grep { !$found{ $_->{biblio_auth_number} }++ }
499                                 @$entries ) {
500         print "." if ( $verbose_logging );
501         print "\r$i" unless ($i++ %100 or !$verbose_logging);
502         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
503         if (defined $marc) {
504             eval {
505                 my $rec;
506                 if ($as_xml) {
507                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
508                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
509                 } else {
510                     $rec = $marc->as_usmarc();
511                 }
512                 print {$fh} $rec;
513                 $num_exported++;
514             };
515             if ($@) {
516               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
517             }
518         }
519     }
520     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
521     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
522     close $fh;
523     return $num_exported;
524 }
525
526 sub generate_deleted_marc_records {
527     my ($record_type, $entries, $directory, $as_xml) = @_;
528
529     my $records_deleted = {};
530     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
531     if (include_xml_wrapper($as_xml, $record_type)) {
532         # include XML declaration and root element
533         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
534     }
535     my $i = 0;
536     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
537         print "\r$i" unless ($i++ %100 or !$verbose_logging);
538         print "." if ( $verbose_logging );
539
540         my $marc = MARC::Record->new();
541         if ($record_type eq 'biblio') {
542             fix_biblio_ids($marc, $record_number, $record_number);
543         } else {
544             fix_authority_id($marc, $record_number);
545         }
546         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
547             fix_unimarc_100($marc);
548         }
549
550         my $rec;
551         if ($as_xml) {
552             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
553             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
554         } else {
555             $rec = $marc->as_usmarc();
556         }
557         print {$fh} $rec;
558
559         $records_deleted->{$record_number} = 1;
560     }
561     print "\nRecords exported: $i\n" if ( $verbose_logging );
562     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
563     close $fh;
564     return $records_deleted;
565
566
567 }
568
569 sub get_corrected_marc_record {
570     my ($record_type, $record_number, $noxml) = @_;
571
572     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
573
574     if (defined $marc) {
575         fix_leader($marc);
576         if ($record_type eq 'authority') {
577             fix_authority_id($marc, $record_number);
578         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
579             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
580             $marc = $normalizer->process($marc);
581         }
582         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
583             fix_unimarc_100($marc);
584         }
585     }
586
587     return $marc;
588 }
589
590 sub get_raw_marc_record {
591     my ($record_type, $record_number, $noxml) = @_;
592
593     my $marc;
594     if ($record_type eq 'biblio') {
595         if ($noxml) {
596             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
597             $fetch_sth->execute($record_number);
598             if (my ($blob) = $fetch_sth->fetchrow_array) {
599                 $marc = MARC::Record->new_from_usmarc($blob);
600                 unless ($marc) {
601                     warn "error creating MARC::Record from $blob";
602                 }
603             }
604             # failure to find a bib is not a problem -
605             # a delete could have been done before
606             # trying to process a record update
607
608             $fetch_sth->finish();
609             return unless $marc;
610         } else {
611             eval { $marc = GetMarcBiblio($record_number, 1); };
612             if ($@ || !$marc) {
613                 # here we do warn since catching an exception
614                 # means that the bib was found but failed
615                 # to be parsed
616                 warn "error retrieving biblio $record_number";
617                 return;
618             }
619         }
620     } else {
621         eval { $marc = GetAuthority($record_number); };
622         if ($@) {
623             warn "error retrieving authority $record_number";
624             return;
625         }
626     }
627     return $marc;
628 }
629
630 sub fix_leader {
631     # FIXME - this routine is suspect
632     # It blanks the Leader/00-05 and Leader/12-16 to
633     # force them to be recalculated correct when
634     # the $marc->as_usmarc() or $marc->as_xml() is called.
635     # But why is this necessary?  It would be a serious bug
636     # in MARC::Record (definitely) and MARC::File::XML (arguably)
637     # if they are emitting incorrect leader values.
638     my $marc = shift;
639
640     my $leader = $marc->leader;
641     substr($leader,  0, 5) = '     ';
642     substr($leader, 10, 7) = '22     ';
643     $marc->leader(substr($leader, 0, 24));
644 }
645
646 sub fix_biblio_ids {
647     # FIXME - it is essential to ensure that the biblionumber is present,
648     #         otherwise, Zebra will choke on the record.  However, this
649     #         logic belongs in the relevant C4::Biblio APIs.
650     my $marc = shift;
651     my $biblionumber = shift;
652     my $biblioitemnumber;
653     if (@_) {
654         $biblioitemnumber = shift;
655     } else {
656         my $sth = $dbh->prepare(
657             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
658         $sth->execute($biblionumber);
659         ($biblioitemnumber) = $sth->fetchrow_array;
660         $sth->finish;
661         unless ($biblioitemnumber) {
662             warn "failed to get biblioitemnumber for biblio $biblionumber";
663             return 0;
664         }
665     }
666
667     # FIXME - this is cheating on two levels
668     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
669     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
670     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
671     #
672     # On the other hand, this better for now than what rebuild_zebra.pl used to
673     # do, which was duplicate the code for inserting the biblionumber
674     # and biblioitemnumber
675     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
676
677     return 1;
678 }
679
680 sub fix_authority_id {
681     # FIXME - as with fix_biblio_ids, the authid must be present
682     #         for Zebra's sake.  However, this really belongs
683     #         in C4::AuthoritiesMarc.
684     my ($marc, $authid) = @_;
685     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
686         $marc->delete_field($marc->field('001'));
687         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
688     }
689 }
690
691 sub fix_unimarc_100 {
692     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
693     my $marc = shift;
694
695     my $string;
696     if ( length($marc->subfield( 100, "a" )) == 36 ) {
697         $string = $marc->subfield( 100, "a" );
698         my $f100 = $marc->field(100);
699         $marc->delete_field($f100);
700     }
701     else {
702         $string = POSIX::strftime( "%Y%m%d", localtime );
703         $string =~ s/\-//g;
704         $string = sprintf( "%-*s", 35, $string );
705     }
706     substr( $string, 22, 6, "frey50" );
707     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
708         $marc->delete_field($marc->field(100));
709         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
710     }
711 }
712
713 sub do_indexing {
714     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
715
716     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
717     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
718     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
719     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
720
721     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
722     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
723     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
724
725 }
726
727 sub print_usage {
728     print <<_USAGE_;
729 $0: reindex MARC bibs and/or authorities in Zebra.
730
731 Use this batch job to reindex all biblio or authority
732 records in your Koha database.
733
734 Parameters:
735
736     -b                      index bibliographic records
737
738     -a                      index authority records
739
740     -daemon                 Run in daemon mode.  The program will loop checking
741                             for entries on the zebraqueue table, processing
742                             them incrementally if present, and then sleep
743                             for a few seconds before repeating the process
744                             Checking the zebraqueue table is done with a cheap
745                             SQL query.  This allows for near realtime update of
746                             the zebra search index with low system overhead.
747                             Use -sleep to control the checking interval.
748
749                             Daemon mode implies -z, -a, -b.  The program will
750                             refuse to start if options are present that do not
751                             make sense while running as an incremental update
752                             daemon (e.g. -r or -offset).
753
754     -sleep 10               Seconds to sleep between checks of the zebraqueue
755                             table in daemon mode.  The default is 5 seconds.
756
757     -z                      select only updated and deleted
758                             records marked in the zebraqueue
759                             table.  Cannot be used with -r
760                             or -s.
761
762     -r                      clear Zebra index before
763                             adding records to index. Implies -w.
764
765     -d                      Temporary directory for indexing.
766                             If not specified, one is automatically
767                             created.  The export directory
768                             is automatically deleted unless
769                             you supply the -k switch.
770
771     -k                      Do not delete export directory.
772
773     -s                      Skip export.  Used if you have
774                             already exported the records
775                             in a previous run.
776
777     -noxml                  index from ISO MARC blob
778                             instead of MARC XML.  This
779                             option is recommended only
780                             for advanced user.
781
782     -x                      export and index as xml instead of is02709 (biblios only).
783                             use this if you might have records > 99,999 chars,
784
785     -nosanitize             export biblio/authority records directly from DB marcxml
786                             field without sanitizing records. It speed up
787                             dump process but could fail if DB contains badly
788                             encoded records. Works only with -x,
789
790     -w                      skip shadow indexing for this batch
791
792     -y                      do NOT clear zebraqueue after indexing; normally,
793                             after doing batch indexing, zebraqueue should be
794                             marked done for the affected record type(s) so that
795                             a running zebraqueue_daemon doesn't try to reindex
796                             the same records - specify -y to override this.
797                             Cannot be used with -z.
798
799     -v                      increase the amount of logging.  Normally only
800                             warnings and errors from the indexing are shown.
801                             Use log level 2 (-v -v) to include all Zebra logs.
802
803     --length   1234         how many biblio you want to export
804     --offset 1243           offset you want to start to
805                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
806                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
807     --where                 let you specify a WHERE query, like itemtype='BOOK'
808                             or something like that
809
810     --run-as-root           explicitily allow script to run as 'root' user
811
812     --help or -h            show this message.
813 _USAGE_
814 }