5f7d8dd7cd969590f17e3e34383a60ec46c08390
[koha-equinox.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30
31 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
32
33 BEGIN {
34         # set the version for version checking
35     $VERSION = 3.07.00.049;
36         require Exporter;
37         @ISA    = qw(Exporter);
38         @EXPORT = qw(
39     GetZ3950BatchId
40     GetWebserviceBatchId
41     GetImportRecordMarc
42     GetImportRecordMarcXML
43     AddImportBatch
44     GetImportBatch
45     AddAuthToBatch
46     AddBiblioToBatch
47     AddItemsToImportBiblio
48     ModAuthorityInBatch
49     ModBiblioInBatch
50
51     BatchStageMarcRecords
52     BatchFindDuplicates
53     BatchCommitRecords
54     BatchRevertRecords
55     CleanBatch
56
57     GetAllImportBatches
58     GetStagedWebserviceBatches
59     GetImportBatchRangeDesc
60     GetNumberOfNonZ3950ImportBatches
61     GetImportBiblios
62     GetImportRecordsRange
63         GetItemNumbersFromImportBatch
64     
65     GetImportBatchStatus
66     SetImportBatchStatus
67     GetImportBatchOverlayAction
68     SetImportBatchOverlayAction
69     GetImportBatchNoMatchAction
70     SetImportBatchNoMatchAction
71     GetImportBatchItemAction
72     SetImportBatchItemAction
73     GetImportBatchMatcher
74     SetImportBatchMatcher
75     GetImportRecordOverlayStatus
76     SetImportRecordOverlayStatus
77     GetImportRecordStatus
78     SetImportRecordStatus
79     GetImportRecordMatches
80     SetImportRecordMatches
81         );
82 }
83
84 =head1 NAME
85
86 C4::ImportBatch - manage batches of imported MARC records
87
88 =head1 SYNOPSIS
89
90 use C4::ImportBatch;
91
92 =head1 FUNCTIONS
93
94 =head2 GetZ3950BatchId
95
96   my $batchid = GetZ3950BatchId($z3950server);
97
98 Retrieves the ID of the import batch for the Z39.50
99 reservoir for the given target.  If necessary,
100 creates the import batch.
101
102 =cut
103
104 sub GetZ3950BatchId {
105     my ($z3950server) = @_;
106
107     my $dbh = C4::Context->dbh;
108     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
109                              WHERE  batch_type = 'z3950'
110                              AND    file_name = ?");
111     $sth->execute($z3950server);
112     my $rowref = $sth->fetchrow_arrayref();
113     $sth->finish();
114     if (defined $rowref) {
115         return $rowref->[0];
116     } else {
117         my $batch_id = AddImportBatch( {
118                 overlay_action => 'create_new',
119                 import_status => 'staged',
120                 batch_type => 'z3950',
121                 file_name => $z3950server,
122             } );
123         return $batch_id;
124     }
125     
126 }
127
128 =head2 GetWebserviceBatchId
129
130   my $batchid = GetWebserviceBatchId();
131
132 Retrieves the ID of the import batch for webservice.
133 If necessary, creates the import batch.
134
135 =cut
136
137 my $WEBSERVICE_BASE_QRY = <<EOQ;
138 SELECT import_batch_id FROM import_batches
139 WHERE  batch_type = 'webservice'
140 AND    import_status = 'staged'
141 EOQ
142 sub GetWebserviceBatchId {
143     my ($params) = @_;
144
145     my $dbh = C4::Context->dbh;
146     my $sql = $WEBSERVICE_BASE_QRY;
147     my @args;
148     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
149         if (my $val = $params->{$field}) {
150             $sql .= " AND $field = ?";
151             push @args, $val;
152         }
153     }
154     my $id = $dbh->selectrow_array($sql, undef, @args);
155     return $id if $id;
156
157     $params->{batch_type} = 'webservice';
158     $params->{import_status} = 'staged';
159     return AddImportBatch($params);
160 }
161
162 =head2 GetImportRecordMarc
163
164   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
165
166 =cut
167
168 sub GetImportRecordMarc {
169     my ($import_record_id) = @_;
170
171     my $dbh = C4::Context->dbh;
172     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
173         SELECT marc, encoding
174         FROM import_records
175         WHERE import_record_id = ?
176     |, undef, $import_record_id );
177
178     return $marc, $encoding;
179 }
180
181 sub GetRecordFromImportBiblio {
182     my ( $import_record_id, $embed_items ) = @_;
183
184     my ($marc) = GetImportRecordMarc($import_record_id);
185     my $record = MARC::Record->new_from_usmarc($marc);
186
187     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
188
189     return $record;
190 }
191
192 sub EmbedItemsInImportBiblio {
193     my ( $record, $import_record_id ) = @_;
194     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
195     my $dbh = C4::Context->dbh;
196     my $import_items = $dbh->selectall_arrayref(q|
197         SELECT import_items.marcxml
198         FROM import_items
199         WHERE import_record_id = ?
200     |, { Slice => {} }, $import_record_id );
201     my @item_fields;
202     for my $import_item ( @$import_items ) {
203         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
204         push @item_fields, $item_marc->field($itemtag);
205     }
206     $record->append_fields(@item_fields);
207     return $record;
208 }
209
210 =head2 GetImportRecordMarcXML
211
212   my $marcxml = GetImportRecordMarcXML($import_record_id);
213
214 =cut
215
216 sub GetImportRecordMarcXML {
217     my ($import_record_id) = @_;
218
219     my $dbh = C4::Context->dbh;
220     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
221     $sth->execute($import_record_id);
222     my ($marcxml) = $sth->fetchrow();
223     $sth->finish();
224     return $marcxml;
225
226 }
227
228 =head2 AddImportBatch
229
230   my $batch_id = AddImportBatch($params_hash);
231
232 =cut
233
234 sub AddImportBatch {
235     my ($params) = @_;
236
237     my (@fields, @vals);
238     foreach (qw( matcher_id template_id branchcode
239                  overlay_action nomatch_action item_action
240                  import_status batch_type file_name comments record_type )) {
241         if (exists $params->{$_}) {
242             push @fields, $_;
243             push @vals, $params->{$_};
244         }
245     }
246     my $dbh = C4::Context->dbh;
247     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
248                                   VALUES (".join( ',', map '?', @fields).")",
249              undef,
250              @vals);
251     return $dbh->{'mysql_insertid'};
252 }
253
254 =head2 GetImportBatch 
255
256   my $row = GetImportBatch($batch_id);
257
258 Retrieve a hashref of an import_batches row.
259
260 =cut
261
262 sub GetImportBatch {
263     my ($batch_id) = @_;
264
265     my $dbh = C4::Context->dbh;
266     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
267     $sth->bind_param(1, $batch_id);
268     $sth->execute();
269     my $result = $sth->fetchrow_hashref;
270     $sth->finish();
271     return $result;
272
273 }
274
275 =head2 AddBiblioToBatch 
276
277   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
278                 $marc_record, $encoding, $z3950random, $update_counts);
279
280 =cut
281
282 sub AddBiblioToBatch {
283     my $batch_id = shift;
284     my $record_sequence = shift;
285     my $marc_record = shift;
286     my $encoding = shift;
287     my $z3950random = shift;
288     my $update_counts = @_ ? shift : 1;
289
290     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
291     _add_biblio_fields($import_record_id, $marc_record);
292     _update_batch_record_counts($batch_id) if $update_counts;
293     return $import_record_id;
294 }
295
296 =head2 ModBiblioInBatch
297
298   ModBiblioInBatch($import_record_id, $marc_record);
299
300 =cut
301
302 sub ModBiblioInBatch {
303     my ($import_record_id, $marc_record) = @_;
304
305     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
306     _update_biblio_fields($import_record_id, $marc_record);
307
308 }
309
310 =head2 AddAuthToBatch
311
312   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
313                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
314
315 =cut
316
317 sub AddAuthToBatch {
318     my $batch_id = shift;
319     my $record_sequence = shift;
320     my $marc_record = shift;
321     my $encoding = shift;
322     my $z3950random = shift;
323     my $update_counts = @_ ? shift : 1;
324     my $marc_type = shift || C4::Context->preference('marcflavour');
325
326     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
327
328     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
329     _add_auth_fields($import_record_id, $marc_record);
330     _update_batch_record_counts($batch_id) if $update_counts;
331     return $import_record_id;
332 }
333
334 =head2 ModAuthInBatch
335
336   ModAuthInBatch($import_record_id, $marc_record);
337
338 =cut
339
340 sub ModAuthInBatch {
341     my ($import_record_id, $marc_record) = @_;
342
343     my $marcflavour = C4::Context->preference('marcflavour');
344     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
345
346 }
347
348 =head2 BatchStageMarcRecords
349
350   ($batch_id, $num_records, $num_items, @invalid_records) = 
351     BatchStageMarcRecords($encoding, $marc_records, $file_name, $marc_modification_template,
352                           $comments, $branch_code, $parse_items,
353                           $leave_as_staging, 
354                           $progress_interval, $progress_callback);
355
356 =cut
357
358 sub BatchStageMarcRecords {
359     my $record_type = shift;
360     my $encoding = shift;
361     my $marc_records = shift;
362     my $file_name = shift;
363     my $marc_modification_template = shift;
364     my $comments = shift;
365     my $branch_code = shift;
366     my $parse_items = shift;
367     my $leave_as_staging = shift;
368
369     # optional callback to monitor status 
370     # of job
371     my $progress_interval = 0;
372     my $progress_callback = undef;
373     if ($#_ == 1) {
374         $progress_interval = shift;
375         $progress_callback = shift;
376         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
377         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
378     } 
379     
380     my $batch_id = AddImportBatch( {
381             overlay_action => 'create_new',
382             import_status => 'staging',
383             batch_type => 'batch',
384             file_name => $file_name,
385             comments => $comments,
386             record_type => $record_type,
387         } );
388     if ($parse_items) {
389         SetImportBatchItemAction($batch_id, 'always_add');
390     } else {
391         SetImportBatchItemAction($batch_id, 'ignore');
392     }
393
394     my $marc_type = C4::Context->preference('marcflavour');
395     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
396     my @invalid_records = ();
397     my $num_valid = 0;
398     my $num_items = 0;
399     # FIXME - for now, we're dealing only with bibs
400     my $rec_num = 0;
401     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
402         $marc_blob =~ s/^\s+//g;
403         $marc_blob =~ s/\s+$//g;
404         next unless $marc_blob;
405         $rec_num++;
406         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
407             &$progress_callback($rec_num);
408         }
409         my ($marc_record, $charset_guessed, $char_errors) =
410             MarcToUTF8Record($marc_blob, $marc_type, $encoding);
411
412         $encoding = $charset_guessed unless $encoding;
413
414         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
415
416         my $import_record_id;
417         if (scalar($marc_record->fields()) == 0) {
418             push @invalid_records, $marc_blob;
419         } else {
420
421             # Normalize the record so it doesn't have separated diacritics
422             SetUTF8Flag($marc_record);
423
424             $num_valid++;
425             if ($record_type eq 'biblio') {
426                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
427                 if ($parse_items) {
428                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
429                     $num_items += scalar(@import_items_ids);
430                 }
431             } elsif ($record_type eq 'auth') {
432                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
433             }
434         }
435     }
436     unless ($leave_as_staging) {
437         SetImportBatchStatus($batch_id, 'staged');
438     }
439     # FIXME branch_code, number of bibs, number of items
440     _update_batch_record_counts($batch_id);
441     return ($batch_id, $num_valid, $num_items, @invalid_records);
442 }
443
444 =head2 AddItemsToImportBiblio
445
446   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
447                 $import_record_id, $marc_record, $update_counts);
448
449 =cut
450
451 sub AddItemsToImportBiblio {
452     my $batch_id = shift;
453     my $import_record_id = shift;
454     my $marc_record = shift;
455     my $update_counts = @_ ? shift : 0;
456
457     my @import_items_ids = ();
458    
459     my $dbh = C4::Context->dbh; 
460     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
461     foreach my $item_field ($marc_record->field($item_tag)) {
462         my $item_marc = MARC::Record->new();
463         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
464         $item_marc->append_fields($item_field);
465         $marc_record->delete_field($item_field);
466         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
467                                         VALUES (?, ?, ?)");
468         $sth->bind_param(1, $import_record_id);
469         $sth->bind_param(2, 'staged');
470         $sth->bind_param(3, $item_marc->as_xml());
471         $sth->execute();
472         push @import_items_ids, $dbh->{'mysql_insertid'};
473         $sth->finish();
474     }
475
476     if ($#import_items_ids > -1) {
477         _update_batch_record_counts($batch_id) if $update_counts;
478         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
479     }
480     return @import_items_ids;
481 }
482
483 =head2 BatchFindDuplicates
484
485   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
486              $max_matches, $progress_interval, $progress_callback);
487
488 Goes through the records loaded in the batch and attempts to 
489 find duplicates for each one.  Sets the matching status 
490 of each record to "no_match" or "auto_match" as appropriate.
491
492 The $max_matches parameter is optional; if it is not supplied,
493 it defaults to 10.
494
495 The $progress_interval and $progress_callback parameters are 
496 optional; if both are supplied, the sub referred to by
497 $progress_callback will be invoked every $progress_interval
498 records using the number of records processed as the 
499 singular argument.
500
501 =cut
502
503 sub BatchFindDuplicates {
504     my $batch_id = shift;
505     my $matcher = shift;
506     my $max_matches = @_ ? shift : 10;
507
508     # optional callback to monitor status 
509     # of job
510     my $progress_interval = 0;
511     my $progress_callback = undef;
512     if ($#_ == 1) {
513         $progress_interval = shift;
514         $progress_callback = shift;
515         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
516         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
517     }
518
519     my $dbh = C4::Context->dbh;
520
521     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
522                              FROM import_records
523                              WHERE import_batch_id = ?");
524     $sth->execute($batch_id);
525     my $num_with_matches = 0;
526     my $rec_num = 0;
527     while (my $rowref = $sth->fetchrow_hashref) {
528         $rec_num++;
529         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
530             &$progress_callback($rec_num);
531         }
532         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
533         my @matches = ();
534         if (defined $matcher) {
535             @matches = $matcher->get_matches($marc_record, $max_matches);
536         }
537         if (scalar(@matches) > 0) {
538             $num_with_matches++;
539             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
540             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
541         } else {
542             SetImportRecordMatches($rowref->{'import_record_id'}, ());
543             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
544         }
545     }
546     $sth->finish();
547     return $num_with_matches;
548 }
549
550 =head2 BatchCommitRecords
551
552   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
553         BatchCommitRecords($batch_id, $framework,
554         $progress_interval, $progress_callback);
555
556 =cut
557
558 sub BatchCommitRecords {
559     my $batch_id = shift;
560     my $framework = shift;
561
562     # optional callback to monitor status 
563     # of job
564     my $progress_interval = 0;
565     my $progress_callback = undef;
566     if ($#_ == 1) {
567         $progress_interval = shift;
568         $progress_callback = shift;
569         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
570         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
571     }
572
573     my $record_type;
574     my $num_added = 0;
575     my $num_updated = 0;
576     my $num_items_added = 0;
577     my $num_items_replaced = 0;
578     my $num_items_errored = 0;
579     my $num_ignored = 0;
580     # commit (i.e., save, all records in the batch)
581     SetImportBatchStatus('importing');
582     my $overlay_action = GetImportBatchOverlayAction($batch_id);
583     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
584     my $item_action = GetImportBatchItemAction($batch_id);
585     my $item_tag;
586     my $item_subfield;
587     my $dbh = C4::Context->dbh;
588     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
589                              FROM import_records
590                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
591                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
592                              WHERE import_batch_id = ?");
593     $sth->execute($batch_id);
594     my $marcflavour = C4::Context->preference('marcflavour');
595     my $rec_num = 0;
596     while (my $rowref = $sth->fetchrow_hashref) {
597         $record_type = $rowref->{'record_type'};
598         $rec_num++;
599         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
600             &$progress_callback($rec_num);
601         }
602         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
603             $num_ignored++;
604             next;
605         }
606
607         my $marc_type;
608         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
609             $marc_type = 'UNIMARCAUTH';
610         } elsif ($marcflavour eq 'UNIMARC') {
611             $marc_type = 'UNIMARC';
612         } else {
613             $marc_type = 'USMARC';
614         }
615         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
616
617         if ($record_type eq 'biblio') {
618             # remove any item tags - rely on BatchCommitItems
619             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
620             foreach my $item_field ($marc_record->field($item_tag)) {
621                 $marc_record->delete_field($item_field);
622             }
623         }
624
625         my ($record_result, $item_result, $record_match) =
626             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
627                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
628
629         my $recordid;
630         my $query;
631         if ($record_result eq 'create_new') {
632             $num_added++;
633             if ($record_type eq 'biblio') {
634                 my $biblioitemnumber;
635                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
636                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
637                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
638                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
639                     $num_items_added += $bib_items_added;
640                     $num_items_replaced += $bib_items_replaced;
641                     $num_items_errored += $bib_items_errored;
642                 }
643             } else {
644                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
645                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
646             }
647             my $sth = $dbh->prepare_cached($query);
648             $sth->execute($recordid, $rowref->{'import_record_id'});
649             $sth->finish();
650             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
651         } elsif ($record_result eq 'replace') {
652             $num_updated++;
653             $recordid = $record_match;
654             my $oldxml;
655             if ($record_type eq 'biblio') {
656                 my $oldbiblio = GetBiblio($recordid);
657                 $oldxml = GetXmlBiblio($recordid);
658
659                 # remove item fields so that they don't get
660                 # added again if record is reverted
661                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
662                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
663                 foreach my $item_field ($old_marc->field($item_tag)) {
664                     $old_marc->delete_field($item_field);
665                 }
666                 $oldxml = $old_marc->as_xml($marc_type);
667
668                 ModBiblio($marc_record, $recordid, $oldbiblio->{'frameworkcode'});
669                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
670
671                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
672                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
673                     $num_items_added += $bib_items_added;
674                     $num_items_replaced += $bib_items_replaced;
675                     $num_items_errored += $bib_items_errored;
676                 }
677             } else {
678                 $oldxml = GetAuthorityXML($recordid);
679
680                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
681                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
682             }
683             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
684             $sth->execute($oldxml, $rowref->{'import_record_id'});
685             $sth->finish();
686             my $sth2 = $dbh->prepare_cached($query);
687             $sth2->execute($recordid, $rowref->{'import_record_id'});
688             $sth2->finish();
689             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
690             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
691         } elsif ($record_result eq 'ignore') {
692             $recordid = $record_match;
693             $num_ignored++;
694             $recordid = $record_match;
695             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
696                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
697                 $num_items_added += $bib_items_added;
698          $num_items_replaced += $bib_items_replaced;
699                 $num_items_errored += $bib_items_errored;
700                 # still need to record the matched biblionumber so that the
701                 # items can be reverted
702                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
703                 $sth2->execute($recordid, $rowref->{'import_record_id'});
704                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
705             }
706             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
707         }
708     }
709     $sth->finish();
710     SetImportBatchStatus($batch_id, 'imported');
711     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
712 }
713
714 =head2 BatchCommitItems
715
716   ($num_items_added, $num_items_errored) = 
717          BatchCommitItems($import_record_id, $biblionumber);
718
719 =cut
720
721 sub BatchCommitItems {
722     my ( $import_record_id, $biblionumber, $action ) = @_;
723
724     my $dbh = C4::Context->dbh;
725
726     my $num_items_added = 0;
727     my $num_items_errored = 0;
728     my $num_items_replaced = 0;
729
730     my $sth = $dbh->prepare( "
731         SELECT import_items_id, import_items.marcxml, encoding
732         FROM import_items
733         JOIN import_records USING (import_record_id)
734         WHERE import_record_id = ?
735         ORDER BY import_items_id
736     " );
737     $sth->bind_param( 1, $import_record_id );
738     $sth->execute();
739
740     while ( my $row = $sth->fetchrow_hashref() ) {
741         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
742
743         # Delete date_due subfield as to not accidentally delete item checkout due dates
744         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
745         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
746
747         my $item = TransformMarcToKoha( $dbh, $item_marc );
748
749         my $duplicate_barcode = exists( $item->{'barcode'} ) && GetItemnumberFromBarcode( $item->{'barcode'} );
750         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
751
752         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
753         if ( $action eq "replace" && $duplicate_itemnumber ) {
754             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
755             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
756             $updsth->bind_param( 1, 'imported' );
757             $updsth->bind_param( 2, $item->{itemnumber} );
758             $updsth->bind_param( 3, $row->{'import_items_id'} );
759             $updsth->execute();
760             $updsth->finish();
761             $num_items_replaced++;
762         } elsif ( $action eq "replace" && $duplicate_barcode ) {
763             my $itemnumber = GetItemnumberFromBarcode( $item->{'barcode'} );
764             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
765             $updsth->bind_param( 1, 'imported' );
766             $updsth->bind_param( 2, $item->{itemnumber} );
767             $updsth->bind_param( 3, $row->{'import_items_id'} );
768             $updsth->execute();
769             $updsth->finish();
770             $num_items_replaced++;
771         } elsif ($duplicate_barcode) {
772             $updsth->bind_param( 1, 'error' );
773             $updsth->bind_param( 2, 'duplicate item barcode' );
774             $updsth->bind_param( 3, $row->{'import_items_id'} );
775             $updsth->execute();
776             $num_items_errored++;
777         } else {
778             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
779             $updsth->bind_param( 1, 'imported' );
780             $updsth->bind_param( 2, $itemnumber );
781             $updsth->bind_param( 3, $row->{'import_items_id'} );
782             $updsth->execute();
783             $updsth->finish();
784             $num_items_added++;
785         }
786     }
787
788     return ( $num_items_added, $num_items_replaced, $num_items_errored );
789 }
790
791 =head2 BatchRevertRecords
792
793   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
794       $num_ignored) = BatchRevertRecords($batch_id);
795
796 =cut
797
798 sub BatchRevertRecords {
799     my $batch_id = shift;
800
801     my $record_type;
802     my $num_deleted = 0;
803     my $num_errors = 0;
804     my $num_reverted = 0;
805     my $num_ignored = 0;
806     my $num_items_deleted = 0;
807     # commit (i.e., save, all records in the batch)
808     SetImportBatchStatus('reverting');
809     my $overlay_action = GetImportBatchOverlayAction($batch_id);
810     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
811     my $dbh = C4::Context->dbh;
812     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
813                              FROM import_records
814                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
815                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
816                              WHERE import_batch_id = ?");
817     $sth->execute($batch_id);
818     my $marc_type;
819     my $marcflavour = C4::Context->preference('marcflavour');
820     while (my $rowref = $sth->fetchrow_hashref) {
821         $record_type = $rowref->{'record_type'};
822         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
823             $num_ignored++;
824             next;
825         }
826         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
827             $marc_type = 'UNIMARCAUTH';
828         } elsif ($marcflavour eq 'UNIMARC') {
829             $marc_type = 'UNIMARC';
830         } else {
831             $marc_type = 'USMARC';
832         }
833
834         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
835
836         if ($record_result eq 'delete') {
837             my $error = undef;
838             if  ($record_type eq 'biblio') {
839                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
840                 $error = DelBiblio($rowref->{'matched_biblionumber'});
841             } else {
842                 my $deletedauthid = DelAuthority($rowref->{'matched_authid'});
843             }
844             if (defined $error) {
845                 $num_errors++;
846             } else {
847                 $num_deleted++;
848                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
849             }
850         } elsif ($record_result eq 'restore') {
851             $num_reverted++;
852             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
853             if ($record_type eq 'biblio') {
854                 my $biblionumber = $rowref->{'matched_biblionumber'};
855                 my $oldbiblio = GetBiblio($biblionumber);
856                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
857                 ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
858             } else {
859                 my $authid = $rowref->{'matched_authid'};
860                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
861             }
862             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
863         } elsif ($record_result eq 'ignore') {
864             if ($record_type eq 'biblio') {
865                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
866             }
867             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
868         }
869         my $query;
870         if ($record_type eq 'biblio') {
871             # remove matched_biblionumber only if there is no 'imported' item left
872             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
873             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
874         } else {
875             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
876         }
877         my $sth2 = $dbh->prepare_cached($query);
878         $sth2->execute($rowref->{'import_record_id'});
879     }
880
881     $sth->finish();
882     SetImportBatchStatus($batch_id, 'reverted');
883     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
884 }
885
886 =head2 BatchRevertItems
887
888   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
889
890 =cut
891
892 sub BatchRevertItems {
893     my ($import_record_id, $biblionumber) = @_;
894
895     my $dbh = C4::Context->dbh;
896     my $num_items_deleted = 0;
897
898     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
899                                    FROM import_items
900                                    JOIN items USING (itemnumber)
901                                    WHERE import_record_id = ?");
902     $sth->bind_param(1, $import_record_id);
903     $sth->execute();
904     while (my $row = $sth->fetchrow_hashref()) {
905         my $error = DelItemCheck($dbh, $biblionumber, $row->{'itemnumber'});
906         if ($error == 1){
907             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
908             $updsth->bind_param(1, 'reverted');
909             $updsth->bind_param(2, $row->{'import_items_id'});
910             $updsth->execute();
911             $updsth->finish();
912             $num_items_deleted++;
913         }
914         else {
915             next;
916         }
917     }
918     $sth->finish();
919     return $num_items_deleted;
920 }
921
922 =head2 CleanBatch
923
924   CleanBatch($batch_id)
925
926 Deletes all staged records from the import batch
927 and sets the status of the batch to 'cleaned'.  Note
928 that deleting a stage record does *not* affect
929 any record that has been committed to the database.
930
931 =cut
932
933 sub CleanBatch {
934     my $batch_id = shift;
935     return unless defined $batch_id;
936
937     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
938     SetImportBatchStatus($batch_id, 'cleaned');
939 }
940
941 =head2 GetAllImportBatches
942
943   my $results = GetAllImportBatches();
944
945 Returns a references to an array of hash references corresponding
946 to all import_batches rows (of batch_type 'batch'), sorted in 
947 ascending order by import_batch_id.
948
949 =cut
950
951 sub  GetAllImportBatches {
952     my $dbh = C4::Context->dbh;
953     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
954                                     WHERE batch_type IN ('batch', 'webservice')
955                                     ORDER BY import_batch_id ASC");
956
957     my $results = [];
958     $sth->execute();
959     while (my $row = $sth->fetchrow_hashref) {
960         push @$results, $row;
961     }
962     $sth->finish();
963     return $results;
964 }
965
966 =head2 GetStagedWebserviceBatches
967
968   my $batch_ids = GetStagedWebserviceBatches();
969
970 Returns a references to an array of batch id's
971 of batch_type 'webservice' that are not imported
972
973 =cut
974
975 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
976 SELECT import_batch_id FROM import_batches
977 WHERE batch_type = 'webservice'
978 AND import_status = 'staged'
979 EOQ
980 sub  GetStagedWebserviceBatches {
981     my $dbh = C4::Context->dbh;
982     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
983 }
984
985 =head2 GetImportBatchRangeDesc
986
987   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
988
989 Returns a reference to an array of hash references corresponding to
990 import_batches rows (sorted in descending order by import_batch_id)
991 start at the given offset.
992
993 =cut
994
995 sub GetImportBatchRangeDesc {
996     my ($offset, $results_per_group) = @_;
997
998     my $dbh = C4::Context->dbh;
999     my $query = "SELECT * FROM import_batches
1000                                     WHERE batch_type IN ('batch', 'webservice')
1001                                     ORDER BY import_batch_id DESC";
1002     my @params;
1003     if ($results_per_group){
1004         $query .= " LIMIT ?";
1005         push(@params, $results_per_group);
1006     }
1007     if ($offset){
1008         $query .= " OFFSET ?";
1009         push(@params, $offset);
1010     }
1011     my $sth = $dbh->prepare_cached($query);
1012     $sth->execute(@params);
1013     my $results = $sth->fetchall_arrayref({});
1014     $sth->finish();
1015     return $results;
1016 }
1017
1018 =head2 GetItemNumbersFromImportBatch
1019
1020   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1021
1022 =cut
1023
1024 sub GetItemNumbersFromImportBatch {
1025         my ($batch_id) = @_;
1026         my $dbh = C4::Context->dbh;
1027         my $sth = $dbh->prepare("SELECT itemnumber FROM import_batches,import_records,import_items WHERE import_batches.import_batch_id=import_records.import_batch_id AND import_records.import_record_id=import_items.import_record_id AND import_batches.import_batch_id=?");
1028         $sth->execute($batch_id);
1029         my @items ;
1030         while ( my ($itm) = $sth->fetchrow_array ) {
1031                 push @items, $itm;
1032         }
1033         return @items;
1034 }
1035
1036 =head2 GetNumberOfImportBatches 
1037
1038   my $count = GetNumberOfImportBatches();
1039
1040 =cut
1041
1042 sub GetNumberOfNonZ3950ImportBatches {
1043     my $dbh = C4::Context->dbh;
1044     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1045     $sth->execute();
1046     my ($count) = $sth->fetchrow_array();
1047     $sth->finish();
1048     return $count;
1049 }
1050
1051 =head2 GetImportBiblios
1052
1053   my $results = GetImportBiblios($importid);
1054
1055 =cut
1056
1057 sub GetImportBiblios {
1058     my ($import_record_id) = @_;
1059
1060     my $dbh = C4::Context->dbh;
1061     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1062     return $dbh->selectall_arrayref(
1063         $query,
1064         { Slice => {} },
1065         $import_record_id
1066     );
1067
1068 }
1069
1070 =head2 GetImportRecordsRange
1071
1072   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1073
1074 Returns a reference to an array of hash references corresponding to
1075 import_biblios/import_auths/import_records rows for a given batch
1076 starting at the given offset.
1077
1078 =cut
1079
1080 sub GetImportRecordsRange {
1081     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1082
1083     my $dbh = C4::Context->dbh;
1084
1085     my $order_by = $parameters->{order_by} || 'import_record_id';
1086     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1087
1088     my $order_by_direction =
1089       uc( $parameters->{order_by_direction} ) eq 'DESC' ? 'DESC' : 'ASC';
1090
1091     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1092
1093     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1094                                            record_sequence, status, overlay_status,
1095                                            matched_biblionumber, matched_authid, record_type
1096                                     FROM   import_records
1097                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1098                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1099                                     WHERE  import_batch_id = ?";
1100     my @params;
1101     push(@params, $batch_id);
1102     if ($status) {
1103         $query .= " AND status=?";
1104         push(@params,$status);
1105     }
1106
1107     $query.=" ORDER BY $order_by $order_by_direction";
1108
1109     if($results_per_group){
1110         $query .= " LIMIT ?";
1111         push(@params, $results_per_group);
1112     }
1113     if($offset){
1114         $query .= " OFFSET ?";
1115         push(@params, $offset);
1116     }
1117     my $sth = $dbh->prepare_cached($query);
1118     $sth->execute(@params);
1119     my $results = $sth->fetchall_arrayref({});
1120     $sth->finish();
1121     return $results;
1122
1123 }
1124
1125 =head2 GetBestRecordMatch
1126
1127   my $record_id = GetBestRecordMatch($import_record_id);
1128
1129 =cut
1130
1131 sub GetBestRecordMatch {
1132     my ($import_record_id) = @_;
1133
1134     my $dbh = C4::Context->dbh;
1135     my $sth = $dbh->prepare("SELECT candidate_match_id
1136                              FROM   import_record_matches
1137                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1138                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1139                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1140                              WHERE  import_record_matches.import_record_id = ? AND
1141                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1142                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1143                              ORDER BY score DESC, candidate_match_id DESC");
1144     $sth->execute($import_record_id);
1145     my ($record_id) = $sth->fetchrow_array();
1146     $sth->finish();
1147     return $record_id;
1148 }
1149
1150 =head2 GetImportBatchStatus
1151
1152   my $status = GetImportBatchStatus($batch_id);
1153
1154 =cut
1155
1156 sub GetImportBatchStatus {
1157     my ($batch_id) = @_;
1158
1159     my $dbh = C4::Context->dbh;
1160     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1161     $sth->execute($batch_id);
1162     my ($status) = $sth->fetchrow_array();
1163     $sth->finish();
1164     return $status;
1165
1166 }
1167
1168 =head2 SetImportBatchStatus
1169
1170   SetImportBatchStatus($batch_id, $new_status);
1171
1172 =cut
1173
1174 sub SetImportBatchStatus {
1175     my ($batch_id, $new_status) = @_;
1176
1177     my $dbh = C4::Context->dbh;
1178     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1179     $sth->execute($new_status, $batch_id);
1180     $sth->finish();
1181
1182 }
1183
1184 =head2 GetImportBatchOverlayAction
1185
1186   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1187
1188 =cut
1189
1190 sub GetImportBatchOverlayAction {
1191     my ($batch_id) = @_;
1192
1193     my $dbh = C4::Context->dbh;
1194     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1195     $sth->execute($batch_id);
1196     my ($overlay_action) = $sth->fetchrow_array();
1197     $sth->finish();
1198     return $overlay_action;
1199
1200 }
1201
1202
1203 =head2 SetImportBatchOverlayAction
1204
1205   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1206
1207 =cut
1208
1209 sub SetImportBatchOverlayAction {
1210     my ($batch_id, $new_overlay_action) = @_;
1211
1212     my $dbh = C4::Context->dbh;
1213     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1214     $sth->execute($new_overlay_action, $batch_id);
1215     $sth->finish();
1216
1217 }
1218
1219 =head2 GetImportBatchNoMatchAction
1220
1221   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1222
1223 =cut
1224
1225 sub GetImportBatchNoMatchAction {
1226     my ($batch_id) = @_;
1227
1228     my $dbh = C4::Context->dbh;
1229     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1230     $sth->execute($batch_id);
1231     my ($nomatch_action) = $sth->fetchrow_array();
1232     $sth->finish();
1233     return $nomatch_action;
1234
1235 }
1236
1237
1238 =head2 SetImportBatchNoMatchAction
1239
1240   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1241
1242 =cut
1243
1244 sub SetImportBatchNoMatchAction {
1245     my ($batch_id, $new_nomatch_action) = @_;
1246
1247     my $dbh = C4::Context->dbh;
1248     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1249     $sth->execute($new_nomatch_action, $batch_id);
1250     $sth->finish();
1251
1252 }
1253
1254 =head2 GetImportBatchItemAction
1255
1256   my $item_action = GetImportBatchItemAction($batch_id);
1257
1258 =cut
1259
1260 sub GetImportBatchItemAction {
1261     my ($batch_id) = @_;
1262
1263     my $dbh = C4::Context->dbh;
1264     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1265     $sth->execute($batch_id);
1266     my ($item_action) = $sth->fetchrow_array();
1267     $sth->finish();
1268     return $item_action;
1269
1270 }
1271
1272
1273 =head2 SetImportBatchItemAction
1274
1275   SetImportBatchItemAction($batch_id, $new_item_action);
1276
1277 =cut
1278
1279 sub SetImportBatchItemAction {
1280     my ($batch_id, $new_item_action) = @_;
1281
1282     my $dbh = C4::Context->dbh;
1283     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1284     $sth->execute($new_item_action, $batch_id);
1285     $sth->finish();
1286
1287 }
1288
1289 =head2 GetImportBatchMatcher
1290
1291   my $matcher_id = GetImportBatchMatcher($batch_id);
1292
1293 =cut
1294
1295 sub GetImportBatchMatcher {
1296     my ($batch_id) = @_;
1297
1298     my $dbh = C4::Context->dbh;
1299     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1300     $sth->execute($batch_id);
1301     my ($matcher_id) = $sth->fetchrow_array();
1302     $sth->finish();
1303     return $matcher_id;
1304
1305 }
1306
1307
1308 =head2 SetImportBatchMatcher
1309
1310   SetImportBatchMatcher($batch_id, $new_matcher_id);
1311
1312 =cut
1313
1314 sub SetImportBatchMatcher {
1315     my ($batch_id, $new_matcher_id) = @_;
1316
1317     my $dbh = C4::Context->dbh;
1318     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1319     $sth->execute($new_matcher_id, $batch_id);
1320     $sth->finish();
1321
1322 }
1323
1324 =head2 GetImportRecordOverlayStatus
1325
1326   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1327
1328 =cut
1329
1330 sub GetImportRecordOverlayStatus {
1331     my ($import_record_id) = @_;
1332
1333     my $dbh = C4::Context->dbh;
1334     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1335     $sth->execute($import_record_id);
1336     my ($overlay_status) = $sth->fetchrow_array();
1337     $sth->finish();
1338     return $overlay_status;
1339
1340 }
1341
1342
1343 =head2 SetImportRecordOverlayStatus
1344
1345   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1346
1347 =cut
1348
1349 sub SetImportRecordOverlayStatus {
1350     my ($import_record_id, $new_overlay_status) = @_;
1351
1352     my $dbh = C4::Context->dbh;
1353     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1354     $sth->execute($new_overlay_status, $import_record_id);
1355     $sth->finish();
1356
1357 }
1358
1359 =head2 GetImportRecordStatus
1360
1361   my $status = GetImportRecordStatus($import_record_id);
1362
1363 =cut
1364
1365 sub GetImportRecordStatus {
1366     my ($import_record_id) = @_;
1367
1368     my $dbh = C4::Context->dbh;
1369     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1370     $sth->execute($import_record_id);
1371     my ($status) = $sth->fetchrow_array();
1372     $sth->finish();
1373     return $status;
1374
1375 }
1376
1377
1378 =head2 SetImportRecordStatus
1379
1380   SetImportRecordStatus($import_record_id, $new_status);
1381
1382 =cut
1383
1384 sub SetImportRecordStatus {
1385     my ($import_record_id, $new_status) = @_;
1386
1387     my $dbh = C4::Context->dbh;
1388     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1389     $sth->execute($new_status, $import_record_id);
1390     $sth->finish();
1391
1392 }
1393
1394 =head2 GetImportRecordMatches
1395
1396   my $results = GetImportRecordMatches($import_record_id, $best_only);
1397
1398 =cut
1399
1400 sub GetImportRecordMatches {
1401     my $import_record_id = shift;
1402     my $best_only = @_ ? shift : 0;
1403
1404     my $dbh = C4::Context->dbh;
1405     # FIXME currently biblio only
1406     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1407                                     candidate_match_id, score, record_type
1408                                     FROM import_records
1409                                     JOIN import_record_matches USING (import_record_id)
1410                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1411                                     WHERE import_record_id = ?
1412                                     ORDER BY score DESC, biblionumber DESC");
1413     $sth->bind_param(1, $import_record_id);
1414     my $results = [];
1415     $sth->execute();
1416     while (my $row = $sth->fetchrow_hashref) {
1417         if ($row->{'record_type'} eq 'auth') {
1418             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1419         }
1420         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1421         push @$results, $row;
1422         last if $best_only;
1423     }
1424     $sth->finish();
1425
1426     return $results;
1427     
1428 }
1429
1430
1431 =head2 SetImportRecordMatches
1432
1433   SetImportRecordMatches($import_record_id, @matches);
1434
1435 =cut
1436
1437 sub SetImportRecordMatches {
1438     my $import_record_id = shift;
1439     my @matches = @_;
1440
1441     my $dbh = C4::Context->dbh;
1442     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1443     $delsth->execute($import_record_id);
1444     $delsth->finish();
1445
1446     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1447                                     VALUES (?, ?, ?)");
1448     foreach my $match (@matches) {
1449         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1450     }
1451 }
1452
1453
1454 # internal functions
1455
1456 sub _create_import_record {
1457     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1458
1459     my $dbh = C4::Context->dbh;
1460     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1461                                                          record_type, encoding, z3950random)
1462                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1463     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1464                   $record_type, $encoding, $z3950random);
1465     my $import_record_id = $dbh->{'mysql_insertid'};
1466     $sth->finish();
1467     return $import_record_id;
1468 }
1469
1470 sub _update_import_record_marc {
1471     my ($import_record_id, $marc_record, $marc_type) = @_;
1472
1473     my $dbh = C4::Context->dbh;
1474     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1475                              WHERE  import_record_id = ?");
1476     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1477     $sth->finish();
1478 }
1479
1480 sub _add_auth_fields {
1481     my ($import_record_id, $marc_record) = @_;
1482
1483     my $controlnumber;
1484     if ($marc_record->field('001')) {
1485         $controlnumber = $marc_record->field('001')->data();
1486     }
1487     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1488     my $dbh = C4::Context->dbh;
1489     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1490     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1491     $sth->finish();
1492 }
1493
1494 sub _add_biblio_fields {
1495     my ($import_record_id, $marc_record) = @_;
1496
1497     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1498     my $dbh = C4::Context->dbh;
1499     # FIXME no controlnumber, originalsource
1500     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1501     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1502     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1503     $sth->finish();
1504                 
1505 }
1506
1507 sub _update_biblio_fields {
1508     my ($import_record_id, $marc_record) = @_;
1509
1510     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1511     my $dbh = C4::Context->dbh;
1512     # FIXME no controlnumber, originalsource
1513     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1514     $isbn =~ s/\(.*$//;
1515     $isbn =~ tr/ -_//;
1516     $isbn = uc $isbn;
1517     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1518                              WHERE  import_record_id = ?");
1519     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1520     $sth->finish();
1521 }
1522
1523 sub _parse_biblio_fields {
1524     my ($marc_record) = @_;
1525
1526     my $dbh = C4::Context->dbh;
1527     my $bibliofields = TransformMarcToKoha($dbh, $marc_record, '');
1528     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1529
1530 }
1531
1532 sub _update_batch_record_counts {
1533     my ($batch_id) = @_;
1534
1535     my $dbh = C4::Context->dbh;
1536     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1537                                         num_records = (
1538                                             SELECT COUNT(*)
1539                                             FROM import_records
1540                                             WHERE import_batch_id = import_batches.import_batch_id),
1541                                         num_items = (
1542                                             SELECT COUNT(*)
1543                                             FROM import_records
1544                                             JOIN import_items USING (import_record_id)
1545                                             WHERE import_batch_id = import_batches.import_batch_id
1546                                             AND record_type = 'biblio')
1547                                     WHERE import_batch_id = ?");
1548     $sth->bind_param(1, $batch_id);
1549     $sth->execute();
1550     $sth->finish();
1551 }
1552
1553 sub _get_commit_action {
1554     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1555     
1556     if ($record_type eq 'biblio') {
1557         my ($bib_result, $bib_match, $item_result);
1558
1559         if ($overlay_status ne 'no_match') {
1560             $bib_match = GetBestRecordMatch($import_record_id);
1561             if ($overlay_action eq 'replace') {
1562                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1563             } elsif ($overlay_action eq 'create_new') {
1564                 $bib_result  = 'create_new';
1565             } elsif ($overlay_action eq 'ignore') {
1566                 $bib_result  = 'ignore';
1567             }
1568          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1569                 $item_result = 'create_new';
1570        }
1571       elsif($item_action eq 'replace'){
1572           $item_result = 'replace';
1573           }
1574       else {
1575              $item_result = 'ignore';
1576            }
1577         } else {
1578             $bib_result = $nomatch_action;
1579             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1580         }
1581         return ($bib_result, $item_result, $bib_match);
1582     } else { # must be auths
1583         my ($auth_result, $auth_match);
1584
1585         if ($overlay_status ne 'no_match') {
1586             $auth_match = GetBestRecordMatch($import_record_id);
1587             if ($overlay_action eq 'replace') {
1588                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1589             } elsif ($overlay_action eq 'create_new') {
1590                 $auth_result  = 'create_new';
1591             } elsif ($overlay_action eq 'ignore') {
1592                 $auth_result  = 'ignore';
1593             }
1594         } else {
1595             $auth_result = $nomatch_action;
1596         }
1597
1598         return ($auth_result, undef, $auth_match);
1599
1600     }
1601 }
1602
1603 sub _get_revert_action {
1604     my ($overlay_action, $overlay_status, $status) = @_;
1605
1606     my $bib_result;
1607
1608     if ($status eq 'ignored') {
1609         $bib_result = 'ignore';
1610     } else {
1611         if ($overlay_action eq 'create_new') {
1612             $bib_result = 'delete';
1613         } else {
1614             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1615         }
1616     }
1617     return $bib_result;
1618 }
1619
1620 1;
1621 __END__
1622
1623 =head1 AUTHOR
1624
1625 Koha Development Team <http://koha-community.org/>
1626
1627 Galen Charlton <galen.charlton@liblime.com>
1628
1629 =cut