=head2 get_all_biblios_iterator
- my $it = Koha::BiblioUtils->get_all_biblios_iterator();
+ my $it = Koha::BiblioUtils->get_all_biblios_iterator(%options);
This will provide an iterator object that will, one by one, provide the
Koha::BiblioUtils of each biblio. This will include the item data.
The iterator is a Koha::MetadataIterator object.
+Possible options are:
+
+=over 4
+
+=item C<slice>
+
+slice may be defined as a hash of two values: index and count. index
+is the slice number to process and count is total number of slices.
+With this information the iterator returns just the given slice of
+records instead of all.
+
+=back
+
=cut
sub get_all_biblios_iterator {
+ my ($self, %options) = @_;
+
+ my $search_terms = {};
+ my ($slice_modulo, $slice_count);
+ if ($options{slice}) {
+ $slice_count = $options{slice}->{count};
+ $slice_modulo = $options{slice}->{index};
+ $slice_modulo = 0 if ($slice_modulo == $slice_count);
+
+ $search_terms = \[ ' mod(biblionumber, ?) = ?', $slice_count, $slice_modulo];
+ }
+
my $database = Koha::Database->new();
my $schema = $database->schema();
my $rs =
- $schema->resultset('Biblio')->search( {},
+ $schema->resultset('Biblio')->search(
+ $search_terms,
{ columns => [qw/ biblionumber /] } );
my $next_func = sub {
# Warn and skip bad records, otherwise we break the loop
=head2 get_all_authorities_iterator
- my $it = Koha::MetadataRecord::Authority->get_all_authorities_iterator();
+ my $it = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%options);
This will provide an iterator object that will, one by one, provide the
Koha::MetadataRecord::Authority of each authority.
The iterator is a Koha::MetadataIterator object.
+Possible options are:
+
+=over 4
+
+=item C<slice>
+
+slice may be defined as a hash of two values: index and count. index
+is the slice number to process and count is total number of slices.
+With this information the iterator returns just the given slice of
+records instead of all.
+
+=back
+
=cut
sub get_all_authorities_iterator {
+ my ($self, %options) = @_;
+
+ my $search_terms = {
+ marcxml => { '!=', undef }
+ };
+ my ($slice_modulo, $slice_count);
+ if ($options{slice}) {
+ $slice_count = $options{slice}->{count};
+ $slice_modulo = $options{slice}->{index};
+ $slice_modulo = 0 if ($slice_modulo == $slice_count);
+
+ $search_terms->{authid} = \[ ' mod ? = ?', $slice_count, $slice_modulo];
+ $search_terms = {
+ '-and' => [
+ $search_terms,
+ \[ ' mod(authid, ?) = ?', $slice_count, $slice_modulo]
+ ]
+ };
+ }
+
my $database = Koha::Database->new();
my $schema = $database->schema();
my $rs =
- $schema->resultset('AuthHeader')->search( { marcxml => { '!=', undef } },
+ $schema->resultset('AuthHeader')->search(
+ $search_terms,
{ columns => [qw/ authid authtypecode marcxml /] } );
my $next_func = sub {
my $row = $rs->next();
repeated. This also applies to authorities via authid, so if you're using it,
you probably only want to do one or the other at a time.
+=item B<-p|--processes>
+
+Number of processes to use for indexing. This can be used to do more indexing
+work in parallel on multicore systems. By default, a single process is used.
+
=item B<-v|--verbose>
By default, this program only emits warnings and errors. This makes it talk
=back
+=head1 IMPLEMENTATION
+
=cut
use autodie;
my $verbose = 0;
my $commit = 5000;
-my ($delete, $help, $man);
+my ($delete, $help, $man, $processes);
my ($index_biblios, $index_authorities);
-my (@biblionumbers);
+my (@record_numbers);
$|=1; # flushes output
GetOptions(
- 'c|commit=i' => \$commit,
- 'd|delete' => \$delete,
+ 'c|commit=i' => \$commit,
+ 'd|delete' => \$delete,
'a|authorities' => \$index_authorities,
- 'b|biblios' => \$index_biblios,
- 'bn|bnumber=i' => \@biblionumbers,
- 'v|verbose+' => \$verbose,
- 'h|help' => \$help,
- 'man' => \$man,
+ 'b|biblios' => \$index_biblios,
+ 'bn|bnumber=i' => \@record_numbers,
+ 'p|processes=i' => \$processes,
+ 'v|verbose+' => \$verbose,
+ 'h|help' => \$help,
+ 'man' => \$man,
);
# Default is to do both
pod2usage(1) if $help;
pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
-sanity_check();
+_sanity_check();
+
+_verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
+_verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
+
+my $slice_index = 0;
+my $slice_count = $processes // 1;
+
+if ($slice_count > 1) {
+ # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
+ $slice_index = 1;
+ for (my $proc = 2; $proc <= $processes; $proc++) {
+ my $pid = fork();
+ die "Failed to fork a child process\n" unless defined $pid;
+ if ($pid == 0) {
+ # Child process, give it a slice to process
+ $slice_index = $proc;
+ last;
+ }
+ }
+ # Fudge the commit count a bit to spread out the Elasticsearch commits
+ $commit *= 1 + 0.10 * ($slice_index - 1);
+}
+
+my %iterator_options;
+if ($slice_index) {
+ _log(1, "Processing slice $slice_index of $slice_count\n");
+ $iterator_options{slice} = { index => $slice_index, count => $slice_count };
+}
my $next;
if ($index_biblios) {
_log(1, "Indexing biblios\n");
- if (@biblionumbers) {
+ if (@record_numbers) {
$next = sub {
- my $r = shift @biblionumbers;
+ my $r = shift @record_numbers;
return () unless defined $r;
return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
};
} else {
- my $records = Koha::BiblioUtils->get_all_biblios_iterator();
+ my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
$next = sub {
$records->next();
}
}
- do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
+ _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
}
if ($index_authorities) {
_log(1, "Indexing authorities\n");
- if (@biblionumbers) {
+ if (@record_numbers) {
$next = sub {
- my $r = shift @biblionumbers;
+ my $r = shift @record_numbers;
return () unless defined $r;
my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
return ($r, $a->record);
};
} else {
- my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator();
+ my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
$next = sub {
$records->next();
}
}
- do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
+ _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
}
-sub do_reindex {
- my ( $next, $index_name ) = @_;
+if ($slice_index == 1) {
+ # Main process, wait for children
+ for (my $proc = 2; $proc <= $processes; $proc++) {
+ wait();
+ }
+}
+
+=head2 _verify_index_state
+
+ _ verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
+
+Checks the index state and recreates it if requested.
+=cut
+
+sub _verify_index_state {
+ my ( $index_name, $recreate ) = @_;
+
+ _log(1, "Checking state of $index_name index\n");
my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
- if ($delete) {
+ if ($recreate) {
+ _log(1, "Dropping and recreating $index_name index\n");
$indexer->drop_index() if $indexer->index_exists();
$indexer->create_index();
}
} elsif ($indexer->is_index_status_recreate_required) {
warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
}
+}
+
+=head2 _do_reindex
+
+ _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
+
+Does the actual reindexing. $callback is a function that always returns the next record.
+
+=cut
+
+sub _do_reindex {
+ my ( $next, $index_name ) = @_;
+
+ my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
my $count = 0;
my $commit_count = $commit;
push @id_buffer, $id;
push @commit_buffer, $record;
if ( !( --$commit_count ) ) {
- _log( 1, "Committing $commit records..." );
+ _log( 1, "Committing $commit records...\n" );
$indexer->update_index( \@id_buffer, \@commit_buffer );
$commit_count = $commit;
@id_buffer = ();
@commit_buffer = ();
- _log( 1, " done\n" );
+ _log( 1, "Commit complete\n" );
}
}
_log( 1, "Total $count records indexed\n" );
}
-# Checks some basic stuff to ensure that it's sane before we start.
-sub sanity_check {
+=head2 _sanity_check
+
+ _sanity_check();
+
+Checks some basic stuff to ensure that it's sane before we start.
+
+=cut
+
+sub _sanity_check {
# Do we have an elasticsearch block defined?
my $conf = C4::Context->config('elasticsearch');
die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
}
-# Output progress information.
-#
-# _log($level, $msg);
-#
-# Will output $msg if the verbosity setting is set to $level or more. Will
-# not include a trailing newline.
+=head2 _log
+
+ _log($level, "Message\n");
+
+Output progress information.
+
+Will output the message if verbosity level is set to $level or more. Will not
+include a trailing newline automatically.
+
+=cut
+
sub _log {
my ($level, $msg) = @_;
- print $msg if ($verbose >= $level);
+ print "[$$] $msg" if ($verbose >= $level);
}