Bug 9523: importing staged bib records hangs if a matched bib has been deleted
[koha-equinox.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 2 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29
30 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
31
32 BEGIN {
33         # set the version for version checking
34     $VERSION = 3.07.00.049;
35         require Exporter;
36         @ISA    = qw(Exporter);
37         @EXPORT = qw(
38     GetZ3950BatchId
39     GetWebserviceBatchId
40     GetImportRecordMarc
41     GetImportRecordMarcXML
42     AddImportBatch
43     GetImportBatch
44     AddAuthToBatch
45     AddBiblioToBatch
46     AddItemsToImportBiblio
47     ModAuthorityInBatch
48     ModBiblioInBatch
49
50     BatchStageMarcRecords
51     BatchFindDuplicates
52     BatchCommitRecords
53     BatchRevertRecords
54     CleanBatch
55
56     GetAllImportBatches
57     GetStagedWebserviceBatches
58     GetImportBatchRangeDesc
59     GetNumberOfNonZ3950ImportBatches
60     GetImportRecordsRange
61         GetItemNumbersFromImportBatch
62     
63     GetImportBatchStatus
64     SetImportBatchStatus
65     GetImportBatchOverlayAction
66     SetImportBatchOverlayAction
67     GetImportBatchNoMatchAction
68     SetImportBatchNoMatchAction
69     GetImportBatchItemAction
70     SetImportBatchItemAction
71     GetImportBatchMatcher
72     SetImportBatchMatcher
73     GetImportRecordOverlayStatus
74     SetImportRecordOverlayStatus
75     GetImportRecordStatus
76     SetImportRecordStatus
77     GetImportRecordMatches
78     SetImportRecordMatches
79         );
80 }
81
82 =head1 NAME
83
84 C4::ImportBatch - manage batches of imported MARC records
85
86 =head1 SYNOPSIS
87
88 use C4::ImportBatch;
89
90 =head1 FUNCTIONS
91
92 =head2 GetZ3950BatchId
93
94   my $batchid = GetZ3950BatchId($z3950server);
95
96 Retrieves the ID of the import batch for the Z39.50
97 reservoir for the given target.  If necessary,
98 creates the import batch.
99
100 =cut
101
102 sub GetZ3950BatchId {
103     my ($z3950server) = @_;
104
105     my $dbh = C4::Context->dbh;
106     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
107                              WHERE  batch_type = 'z3950'
108                              AND    file_name = ?");
109     $sth->execute($z3950server);
110     my $rowref = $sth->fetchrow_arrayref();
111     $sth->finish();
112     if (defined $rowref) {
113         return $rowref->[0];
114     } else {
115         my $batch_id = AddImportBatch( {
116                 overlay_action => 'create_new',
117                 import_status => 'staged',
118                 batch_type => 'z3950',
119                 file_name => $z3950server,
120             } );
121         return $batch_id;
122     }
123     
124 }
125
126 =head2 GetWebserviceBatchId
127
128   my $batchid = GetWebserviceBatchId();
129
130 Retrieves the ID of the import batch for webservice.
131 If necessary, creates the import batch.
132
133 =cut
134
135 my $WEBSERVICE_BASE_QRY = <<EOQ;
136 SELECT import_batch_id FROM import_batches
137 WHERE  batch_type = 'webservice'
138 AND    import_status = 'staged'
139 EOQ
140 sub GetWebserviceBatchId {
141     my ($params) = @_;
142
143     my $dbh = C4::Context->dbh;
144     my $sql = $WEBSERVICE_BASE_QRY;
145     my @args;
146     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
147         if (my $val = $params->{$field}) {
148             $sql .= " AND $field = ?";
149             push @args, $val;
150         }
151     }
152     my $id = $dbh->selectrow_array($sql, undef, @args);
153     return $id if $id;
154
155     $params->{batch_type} = 'webservice';
156     $params->{import_status} = 'staged';
157     return AddImportBatch($params);
158 }
159
160 =head2 GetImportRecordMarc
161
162   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
163
164 =cut
165
166 sub GetImportRecordMarc {
167     my ($import_record_id) = @_;
168
169     my $dbh = C4::Context->dbh;
170     my $sth = $dbh->prepare("SELECT marc, encoding FROM import_records WHERE import_record_id = ?");
171     $sth->execute($import_record_id);
172     my ($marc, $encoding) = $sth->fetchrow();
173     $sth->finish();
174     return $marc, $encoding;
175
176 }
177
178 =head2 GetImportRecordMarcXML
179
180   my $marcxml = GetImportRecordMarcXML($import_record_id);
181
182 =cut
183
184 sub GetImportRecordMarcXML {
185     my ($import_record_id) = @_;
186
187     my $dbh = C4::Context->dbh;
188     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
189     $sth->execute($import_record_id);
190     my ($marcxml) = $sth->fetchrow();
191     $sth->finish();
192     return $marcxml;
193
194 }
195
196 =head2 AddImportBatch
197
198   my $batch_id = AddImportBatch($params_hash);
199
200 =cut
201
202 sub AddImportBatch {
203     my ($params) = @_;
204
205     my (@fields, @vals);
206     foreach (qw( matcher_id template_id branchcode
207                  overlay_action nomatch_action item_action
208                  import_status batch_type file_name comments record_type )) {
209         if (exists $params->{$_}) {
210             push @fields, $_;
211             push @vals, $params->{$_};
212         }
213     }
214     my $dbh = C4::Context->dbh;
215     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
216                                   VALUES (".join( ',', map '?', @fields).")",
217              undef,
218              @vals);
219     return $dbh->{'mysql_insertid'};
220 }
221
222 =head2 GetImportBatch 
223
224   my $row = GetImportBatch($batch_id);
225
226 Retrieve a hashref of an import_batches row.
227
228 =cut
229
230 sub GetImportBatch {
231     my ($batch_id) = @_;
232
233     my $dbh = C4::Context->dbh;
234     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
235     $sth->bind_param(1, $batch_id);
236     $sth->execute();
237     my $result = $sth->fetchrow_hashref;
238     $sth->finish();
239     return $result;
240
241 }
242
243 =head2 AddBiblioToBatch 
244
245   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
246                 $marc_record, $encoding, $z3950random, $update_counts);
247
248 =cut
249
250 sub AddBiblioToBatch {
251     my $batch_id = shift;
252     my $record_sequence = shift;
253     my $marc_record = shift;
254     my $encoding = shift;
255     my $z3950random = shift;
256     my $update_counts = @_ ? shift : 1;
257
258     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
259     _add_biblio_fields($import_record_id, $marc_record);
260     _update_batch_record_counts($batch_id) if $update_counts;
261     return $import_record_id;
262 }
263
264 =head2 ModBiblioInBatch
265
266   ModBiblioInBatch($import_record_id, $marc_record);
267
268 =cut
269
270 sub ModBiblioInBatch {
271     my ($import_record_id, $marc_record) = @_;
272
273     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
274     _update_biblio_fields($import_record_id, $marc_record);
275
276 }
277
278 =head2 AddAuthToBatch
279
280   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
281                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
282
283 =cut
284
285 sub AddAuthToBatch {
286     my $batch_id = shift;
287     my $record_sequence = shift;
288     my $marc_record = shift;
289     my $encoding = shift;
290     my $z3950random = shift;
291     my $update_counts = @_ ? shift : 1;
292     my $marc_type = shift || C4::Context->preference('marcflavour');
293
294     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
295
296     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
297     _add_auth_fields($import_record_id, $marc_record);
298     _update_batch_record_counts($batch_id) if $update_counts;
299     return $import_record_id;
300 }
301
302 =head2 ModAuthInBatch
303
304   ModAuthInBatch($import_record_id, $marc_record);
305
306 =cut
307
308 sub ModAuthInBatch {
309     my ($import_record_id, $marc_record) = @_;
310
311     my $marcflavour = C4::Context->preference('marcflavour');
312     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
313
314 }
315
316 =head2 BatchStageMarcRecords
317
318   ($batch_id, $num_records, $num_items, @invalid_records) = 
319     BatchStageMarcRecords($record_type, $encoding, $marc_records, $file_name,
320                           $comments, $branch_code, $parse_items,
321                           $leave_as_staging, 
322                           $progress_interval, $progress_callback);
323
324 =cut
325
326 sub  BatchStageMarcRecords {
327     my $record_type = shift;
328     my $encoding = shift;
329     my $marc_records = shift;
330     my $file_name = shift;
331     my $comments = shift;
332     my $branch_code = shift;
333     my $parse_items = shift;
334     my $leave_as_staging = shift;
335
336     # optional callback to monitor status 
337     # of job
338     my $progress_interval = 0;
339     my $progress_callback = undef;
340     if ($#_ == 1) {
341         $progress_interval = shift;
342         $progress_callback = shift;
343         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
344         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
345     } 
346     
347     my $batch_id = AddImportBatch( {
348             overlay_action => 'create_new',
349             import_status => 'staging',
350             batch_type => 'batch',
351             file_name => $file_name,
352             comments => $comments,
353             record_type => $record_type,
354         } );
355     if ($parse_items) {
356         SetImportBatchItemAction($batch_id, 'always_add');
357     } else {
358         SetImportBatchItemAction($batch_id, 'ignore');
359     }
360
361     my $marc_type = C4::Context->preference('marcflavour');
362     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
363     my @invalid_records = ();
364     my $num_valid = 0;
365     my $num_items = 0;
366     # FIXME - for now, we're dealing only with bibs
367     my $rec_num = 0;
368     foreach my $marc_blob (split(/\x1D/, $marc_records)) {
369         $marc_blob =~ s/^\s+//g;
370         $marc_blob =~ s/\s+$//g;
371         next unless $marc_blob;
372         $rec_num++;
373         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
374             &$progress_callback($rec_num);
375         }
376         my ($marc_record, $charset_guessed, $char_errors) =
377             MarcToUTF8Record($marc_blob, $marc_type, $encoding);
378
379         $encoding = $charset_guessed unless $encoding;
380
381         my $import_record_id;
382         if (scalar($marc_record->fields()) == 0) {
383             push @invalid_records, $marc_blob;
384         } else {
385
386             # Normalize the record so it doesn't have separated diacritics
387             SetUTF8Flag($marc_record);
388
389             $num_valid++;
390             if ($record_type eq 'biblio') {
391                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
392                 if ($parse_items) {
393                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
394                     $num_items += scalar(@import_items_ids);
395                 }
396             } elsif ($record_type eq 'auth') {
397                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
398             }
399         }
400     }
401     unless ($leave_as_staging) {
402         SetImportBatchStatus($batch_id, 'staged');
403     }
404     # FIXME branch_code, number of bibs, number of items
405     _update_batch_record_counts($batch_id);
406     return ($batch_id, $num_valid, $num_items, @invalid_records);
407 }
408
409 =head2 AddItemsToImportBiblio
410
411   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
412                 $import_record_id, $marc_record, $update_counts);
413
414 =cut
415
416 sub AddItemsToImportBiblio {
417     my $batch_id = shift;
418     my $import_record_id = shift;
419     my $marc_record = shift;
420     my $update_counts = @_ ? shift : 0;
421
422     my @import_items_ids = ();
423    
424     my $dbh = C4::Context->dbh; 
425     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
426     foreach my $item_field ($marc_record->field($item_tag)) {
427         my $item_marc = MARC::Record->new();
428         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
429         $item_marc->append_fields($item_field);
430         $marc_record->delete_field($item_field);
431         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
432                                         VALUES (?, ?, ?)");
433         $sth->bind_param(1, $import_record_id);
434         $sth->bind_param(2, 'staged');
435         $sth->bind_param(3, $item_marc->as_xml());
436         $sth->execute();
437         push @import_items_ids, $dbh->{'mysql_insertid'};
438         $sth->finish();
439     }
440
441     if ($#import_items_ids > -1) {
442         _update_batch_record_counts($batch_id) if $update_counts;
443         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
444     }
445     return @import_items_ids;
446 }
447
448 =head2 BatchFindDuplicates
449
450   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
451              $max_matches, $progress_interval, $progress_callback);
452
453 Goes through the records loaded in the batch and attempts to 
454 find duplicates for each one.  Sets the matching status 
455 of each record to "no_match" or "auto_match" as appropriate.
456
457 The $max_matches parameter is optional; if it is not supplied,
458 it defaults to 10.
459
460 The $progress_interval and $progress_callback parameters are 
461 optional; if both are supplied, the sub referred to by
462 $progress_callback will be invoked every $progress_interval
463 records using the number of records processed as the 
464 singular argument.
465
466 =cut
467
468 sub BatchFindDuplicates {
469     my $batch_id = shift;
470     my $matcher = shift;
471     my $max_matches = @_ ? shift : 10;
472
473     # optional callback to monitor status 
474     # of job
475     my $progress_interval = 0;
476     my $progress_callback = undef;
477     if ($#_ == 1) {
478         $progress_interval = shift;
479         $progress_callback = shift;
480         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
481         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
482     }
483
484     my $dbh = C4::Context->dbh;
485
486     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
487                              FROM import_records
488                              WHERE import_batch_id = ?");
489     $sth->execute($batch_id);
490     my $num_with_matches = 0;
491     my $rec_num = 0;
492     while (my $rowref = $sth->fetchrow_hashref) {
493         $rec_num++;
494         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
495             &$progress_callback($rec_num);
496         }
497         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
498         my @matches = ();
499         if (defined $matcher) {
500             @matches = $matcher->get_matches($marc_record, $max_matches);
501         }
502         if (scalar(@matches) > 0) {
503             $num_with_matches++;
504             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
505             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
506         } else {
507             SetImportRecordMatches($rowref->{'import_record_id'}, ());
508             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
509         }
510     }
511     $sth->finish();
512     return $num_with_matches;
513 }
514
515 =head2 BatchCommitRecords
516
517   my ($num_added, $num_updated, $num_items_added, $num_items_errored, $num_ignored) =
518         BatchCommitRecords($batch_id, $framework,
519         $progress_interval, $progress_callback);
520
521 =cut
522
523 sub BatchCommitRecords {
524     my $batch_id = shift;
525     my $framework = shift;
526
527     # optional callback to monitor status 
528     # of job
529     my $progress_interval = 0;
530     my $progress_callback = undef;
531     if ($#_ == 1) {
532         $progress_interval = shift;
533         $progress_callback = shift;
534         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
535         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
536     }
537
538     my $record_type;
539     my $num_added = 0;
540     my $num_updated = 0;
541     my $num_items_added = 0;
542     my $num_items_errored = 0;
543     my $num_ignored = 0;
544     # commit (i.e., save, all records in the batch)
545     SetImportBatchStatus('importing');
546     my $overlay_action = GetImportBatchOverlayAction($batch_id);
547     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
548     my $item_action = GetImportBatchItemAction($batch_id);
549     my $item_tag;
550     my $item_subfield;
551     my $dbh = C4::Context->dbh;
552     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
553                              FROM import_records
554                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
555                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
556                              WHERE import_batch_id = ?");
557     $sth->execute($batch_id);
558     my $marcflavour = C4::Context->preference('marcflavour');
559     my $rec_num = 0;
560     while (my $rowref = $sth->fetchrow_hashref) {
561         $record_type = $rowref->{'record_type'};
562         $rec_num++;
563         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
564             &$progress_callback($rec_num);
565         }
566         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
567             $num_ignored++;
568             next;
569         }
570
571         my $marc_type;
572         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
573             $marc_type = 'UNIMARCAUTH';
574         } elsif ($marcflavour eq 'UNIMARC') {
575             $marc_type = 'UNIMARC';
576         } else {
577             $marc_type = 'USMARC';
578         }
579         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
580
581         if ($record_type eq 'biblio') {
582             # remove any item tags - rely on BatchCommitItems
583             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
584             foreach my $item_field ($marc_record->field($item_tag)) {
585                 $marc_record->delete_field($item_field);
586             }
587         }
588
589         my ($record_result, $item_result, $record_match) =
590             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
591                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
592
593         my $recordid;
594         my $query;
595         if ($record_result eq 'create_new') {
596             $num_added++;
597             if ($record_type eq 'biblio') {
598                 my $biblioitemnumber;
599                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
600                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
601                 if ($item_result eq 'create_new') {
602                     my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid);
603                     $num_items_added += $bib_items_added;
604                     $num_items_errored += $bib_items_errored;
605                 }
606             } else {
607                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
608                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
609             }
610             my $sth = $dbh->prepare_cached($query);
611             $sth->execute($recordid, $rowref->{'import_record_id'});
612             $sth->finish();
613             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
614         } elsif ($record_result eq 'replace') {
615             $num_updated++;
616             $recordid = $record_match;
617             my $oldxml;
618             if ($record_type eq 'biblio') {
619                 my ($count, $oldbiblio) = GetBiblio($recordid);
620                 $oldxml = GetXmlBiblio($recordid);
621
622                 # remove item fields so that they don't get
623                 # added again if record is reverted
624                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
625                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
626                 foreach my $item_field ($old_marc->field($item_tag)) {
627                     $old_marc->delete_field($item_field);
628                 }
629                 $oldxml = $old_marc->as_xml($marc_type);
630
631                 ModBiblio($marc_record, $recordid, $oldbiblio->{'frameworkcode'});
632                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
633
634                 if ($item_result eq 'create_new') {
635                     my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid);
636                     $num_items_added += $bib_items_added;
637                     $num_items_errored += $bib_items_errored;
638                 }
639             } else {
640                 $oldxml = GetAuthorityXML($recordid);
641
642                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
643                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
644             }
645             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
646             $sth->execute($oldxml, $rowref->{'import_record_id'});
647             $sth->finish();
648             my $sth2 = $dbh->prepare_cached($query);
649             $sth2->execute($recordid, $rowref->{'import_record_id'});
650             $sth2->finish();
651             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
652             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
653         } elsif ($record_result eq 'ignore') {
654             $num_ignored++;
655             $recordid = $record_match;
656             if ($record_type eq 'biblio' and defined $recordid and $item_result eq 'create_new') {
657                 my ($bib_items_added, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid);
658                 $num_items_added += $bib_items_added;
659                 $num_items_errored += $bib_items_errored;
660                 # still need to record the matched biblionumber so that the
661                 # items can be reverted
662                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
663                 $sth2->execute($recordid, $rowref->{'import_record_id'});
664                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
665             }
666             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
667         }
668     }
669     $sth->finish();
670     SetImportBatchStatus($batch_id, 'imported');
671     return ($num_added, $num_updated, $num_items_added, $num_items_errored, $num_ignored);
672 }
673
674 =head2 BatchCommitItems
675
676   ($num_items_added, $num_items_errored) = 
677          BatchCommitItems($import_record_id, $biblionumber);
678
679 =cut
680
681 sub BatchCommitItems {
682     my ($import_record_id, $biblionumber) = @_;
683
684     my $dbh = C4::Context->dbh;
685
686     my $num_items_added = 0;
687     my $num_items_errored = 0;
688     my $sth = $dbh->prepare("SELECT import_items_id, import_items.marcxml, encoding
689                              FROM import_items
690                              JOIN import_records USING (import_record_id)
691                              WHERE import_record_id = ?
692                              ORDER BY import_items_id");
693     $sth->bind_param(1, $import_record_id);
694     $sth->execute();
695     while (my $row = $sth->fetchrow_hashref()) {
696         my $item_marc = MARC::Record->new_from_xml(StripNonXmlChars($row->{'marcxml'}), 'UTF-8', $row->{'encoding'});
697         # FIXME - duplicate barcode check needs to become part of AddItemFromMarc()
698         my $item = TransformMarcToKoha($dbh, $item_marc);
699         my $duplicate_barcode = exists($item->{'barcode'}) && GetItemnumberFromBarcode($item->{'barcode'});
700         if ($duplicate_barcode) {
701             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, import_error = ? WHERE import_items_id = ?");
702             $updsth->bind_param(1, 'error');
703             $updsth->bind_param(2, 'duplicate item barcode');
704             $updsth->bind_param(3, $row->{'import_items_id'});
705             $updsth->execute();
706             $num_items_errored++;
707         } else {
708             my ($item_biblionumber, $biblioitemnumber, $itemnumber) = AddItemFromMarc($item_marc, $biblionumber);
709             my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
710             $updsth->bind_param(1, 'imported');
711             $updsth->bind_param(2, $itemnumber);
712             $updsth->bind_param(3, $row->{'import_items_id'});
713             $updsth->execute();
714             $updsth->finish();
715             $num_items_added++;
716         }
717     }
718     $sth->finish();
719     return ($num_items_added, $num_items_errored);
720 }
721
722 =head2 BatchRevertRecords
723
724   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
725       $num_ignored) = BatchRevertRecords($batch_id);
726
727 =cut
728
729 sub BatchRevertRecords {
730     my $batch_id = shift;
731
732     my $record_type;
733     my $num_deleted = 0;
734     my $num_errors = 0;
735     my $num_reverted = 0;
736     my $num_ignored = 0;
737     my $num_items_deleted = 0;
738     # commit (i.e., save, all records in the batch)
739     SetImportBatchStatus('reverting');
740     my $overlay_action = GetImportBatchOverlayAction($batch_id);
741     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
742     my $dbh = C4::Context->dbh;
743     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
744                              FROM import_records
745                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
746                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
747                              WHERE import_batch_id = ?");
748     $sth->execute($batch_id);
749     my $marc_type;
750     my $marcflavour = C4::Context->preference('marcflavour');
751     while (my $rowref = $sth->fetchrow_hashref) {
752         $record_type = $rowref->{'record_type'};
753         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
754             $num_ignored++;
755             next;
756         }
757         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
758             $marc_type = 'UNIMARCAUTH';
759         } elsif ($marcflavour eq 'UNIMARC') {
760             $marc_type = 'UNIMARC';
761         } else {
762             $marc_type = 'USMARC';
763         }
764
765         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
766
767         if ($record_result eq 'delete') {
768             my $error = undef;
769             if  ($record_type eq 'biblio') {
770                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
771                 $error = DelBiblio($rowref->{'matched_biblionumber'});
772             } else {
773                 my $deletedauthid = DelAuthority($rowref->{'matched_authid'});
774             }
775             if (defined $error) {
776                 $num_errors++;
777             } else {
778                 $num_deleted++;
779                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
780             }
781         } elsif ($record_result eq 'restore') {
782             $num_reverted++;
783             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
784             if ($record_type eq 'biblio') {
785                 my $biblionumber = $rowref->{'matched_biblionumber'};
786                 my ($count, $oldbiblio) = GetBiblio($biblionumber);
787                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
788                 ModBiblio($old_record, $biblionumber, $oldbiblio->{'frameworkcode'});
789             } else {
790                 my $authid = $rowref->{'matched_authid'};
791                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
792             }
793             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
794         } elsif ($record_result eq 'ignore') {
795             if ($record_type eq 'biblio') {
796                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
797             }
798             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
799         }
800         my $query;
801         if ($record_type eq 'biblio') {
802             # remove matched_biblionumber only if there is no 'imported' item left
803             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
804             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
805         } else {
806             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
807         }
808         my $sth2 = $dbh->prepare_cached($query);
809         $sth2->execute($rowref->{'import_record_id'});
810     }
811
812     $sth->finish();
813     SetImportBatchStatus($batch_id, 'reverted');
814     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
815 }
816
817 =head2 BatchRevertItems
818
819   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
820
821 =cut
822
823 sub BatchRevertItems {
824     my ($import_record_id, $biblionumber) = @_;
825
826     my $dbh = C4::Context->dbh;
827     my $num_items_deleted = 0;
828
829     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
830                                    FROM import_items
831                                    JOIN items USING (itemnumber)
832                                    WHERE import_record_id = ?");
833     $sth->bind_param(1, $import_record_id);
834     $sth->execute();
835     while (my $row = $sth->fetchrow_hashref()) {
836         my $error = DelItemCheck($dbh, $biblionumber, $row->{'itemnumber'});
837         if ($error == 1){
838             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
839             $updsth->bind_param(1, 'reverted');
840             $updsth->bind_param(2, $row->{'import_items_id'});
841             $updsth->execute();
842             $updsth->finish();
843             $num_items_deleted++;
844         }
845         else {
846             next;
847         }
848     }
849     $sth->finish();
850     return $num_items_deleted;
851 }
852
853 =head2 CleanBatch
854
855   CleanBatch($batch_id)
856
857 Deletes all staged records from the import batch
858 and sets the status of the batch to 'cleaned'.  Note
859 that deleting a stage record does *not* affect
860 any record that has been committed to the database.
861
862 =cut
863
864 sub CleanBatch {
865     my $batch_id = shift;
866     return unless defined $batch_id;
867
868     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
869     SetImportBatchStatus($batch_id, 'cleaned');
870 }
871
872 =head2 GetAllImportBatches
873
874   my $results = GetAllImportBatches();
875
876 Returns a references to an array of hash references corresponding
877 to all import_batches rows (of batch_type 'batch'), sorted in 
878 ascending order by import_batch_id.
879
880 =cut
881
882 sub  GetAllImportBatches {
883     my $dbh = C4::Context->dbh;
884     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
885                                     WHERE batch_type IN ('batch', 'webservice')
886                                     ORDER BY import_batch_id ASC");
887
888     my $results = [];
889     $sth->execute();
890     while (my $row = $sth->fetchrow_hashref) {
891         push @$results, $row;
892     }
893     $sth->finish();
894     return $results;
895 }
896
897 =head2 GetStagedWebserviceBatches
898
899   my $batch_ids = GetStagedWebserviceBatches();
900
901 Returns a references to an array of batch id's
902 of batch_type 'webservice' that are not imported
903
904 =cut
905
906 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
907 SELECT import_batch_id FROM import_batches
908 WHERE batch_type = 'webservice'
909 AND import_status = 'staged'
910 EOQ
911 sub  GetStagedWebserviceBatches {
912     my $dbh = C4::Context->dbh;
913     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
914 }
915
916 =head2 GetImportBatchRangeDesc
917
918   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
919
920 Returns a reference to an array of hash references corresponding to
921 import_batches rows (sorted in descending order by import_batch_id)
922 start at the given offset.
923
924 =cut
925
926 sub GetImportBatchRangeDesc {
927     my ($offset, $results_per_group) = @_;
928
929     my $dbh = C4::Context->dbh;
930     my $query = "SELECT * FROM import_batches
931                                     WHERE batch_type IN ('batch', 'webservice')
932                                     ORDER BY import_batch_id DESC";
933     my @params;
934     if ($results_per_group){
935         $query .= " LIMIT ?";
936         push(@params, $results_per_group);
937     }
938     if ($offset){
939         $query .= " OFFSET ?";
940         push(@params, $offset);
941     }
942     my $sth = $dbh->prepare_cached($query);
943     $sth->execute(@params);
944     my $results = $sth->fetchall_arrayref({});
945     $sth->finish();
946     return $results;
947 }
948
949 =head2 GetItemNumbersFromImportBatch
950
951   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
952
953 =cut
954
955 sub GetItemNumbersFromImportBatch {
956         my ($batch_id) = @_;
957         my $dbh = C4::Context->dbh;
958         my $sth = $dbh->prepare("SELECT itemnumber FROM import_batches,import_records,import_items WHERE import_batches.import_batch_id=import_records.import_batch_id AND import_records.import_record_id=import_items.import_record_id AND import_batches.import_batch_id=?");
959         $sth->execute($batch_id);
960         my @items ;
961         while ( my ($itm) = $sth->fetchrow_array ) {
962                 push @items, $itm;
963         }
964         return @items;
965 }
966
967 =head2 GetNumberOfImportBatches 
968
969   my $count = GetNumberOfImportBatches();
970
971 =cut
972
973 sub GetNumberOfNonZ3950ImportBatches {
974     my $dbh = C4::Context->dbh;
975     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
976     $sth->execute();
977     my ($count) = $sth->fetchrow_array();
978     $sth->finish();
979     return $count;
980 }
981
982 =head2 GetImportRecordsRange
983
984   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
985
986 Returns a reference to an array of hash references corresponding to
987 import_biblios/import_auths/import_records rows for a given batch
988 starting at the given offset.
989
990 =cut
991
992 sub GetImportRecordsRange {
993     my ($batch_id, $offset, $results_per_group, $status) = @_;
994
995     my $dbh = C4::Context->dbh;
996     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
997                                            record_sequence, status, overlay_status,
998                                            matched_biblionumber, matched_authid, record_type
999                                     FROM   import_records
1000                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1001                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1002                                     WHERE  import_batch_id = ?";
1003     my @params;
1004     push(@params, $batch_id);
1005     if ($status) {
1006         $query .= " AND status=?";
1007         push(@params,$status);
1008     }
1009     $query.=" ORDER BY import_record_id";
1010
1011     if($results_per_group){
1012         $query .= " LIMIT ?";
1013         push(@params, $results_per_group);
1014     }
1015     if($offset){
1016         $query .= " OFFSET ?";
1017         push(@params, $offset);
1018     }
1019     my $sth = $dbh->prepare_cached($query);
1020     $sth->execute(@params);
1021     my $results = $sth->fetchall_arrayref({});
1022     $sth->finish();
1023     return $results;
1024
1025 }
1026
1027 =head2 GetBestRecordMatch
1028
1029   my $record_id = GetBestRecordMatch($import_record_id);
1030
1031 =cut
1032
1033 sub GetBestRecordMatch {
1034     my ($import_record_id) = @_;
1035
1036     my $dbh = C4::Context->dbh;
1037     my $sth = $dbh->prepare("SELECT candidate_match_id
1038                              FROM   import_record_matches
1039                              JOIN biblio ON ( candidate_match_id = biblionumber )
1040                              WHERE  import_record_id = ?
1041                              ORDER BY score DESC, candidate_match_id DESC");
1042     $sth->execute($import_record_id);
1043     my ($record_id) = $sth->fetchrow_array();
1044     $sth->finish();
1045     return $record_id;
1046 }
1047
1048 =head2 GetImportBatchStatus
1049
1050   my $status = GetImportBatchStatus($batch_id);
1051
1052 =cut
1053
1054 sub GetImportBatchStatus {
1055     my ($batch_id) = @_;
1056
1057     my $dbh = C4::Context->dbh;
1058     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1059     $sth->execute($batch_id);
1060     my ($status) = $sth->fetchrow_array();
1061     $sth->finish();
1062     return $status;
1063
1064 }
1065
1066 =head2 SetImportBatchStatus
1067
1068   SetImportBatchStatus($batch_id, $new_status);
1069
1070 =cut
1071
1072 sub SetImportBatchStatus {
1073     my ($batch_id, $new_status) = @_;
1074
1075     my $dbh = C4::Context->dbh;
1076     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1077     $sth->execute($new_status, $batch_id);
1078     $sth->finish();
1079
1080 }
1081
1082 =head2 GetImportBatchOverlayAction
1083
1084   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1085
1086 =cut
1087
1088 sub GetImportBatchOverlayAction {
1089     my ($batch_id) = @_;
1090
1091     my $dbh = C4::Context->dbh;
1092     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1093     $sth->execute($batch_id);
1094     my ($overlay_action) = $sth->fetchrow_array();
1095     $sth->finish();
1096     return $overlay_action;
1097
1098 }
1099
1100
1101 =head2 SetImportBatchOverlayAction
1102
1103   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1104
1105 =cut
1106
1107 sub SetImportBatchOverlayAction {
1108     my ($batch_id, $new_overlay_action) = @_;
1109
1110     my $dbh = C4::Context->dbh;
1111     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1112     $sth->execute($new_overlay_action, $batch_id);
1113     $sth->finish();
1114
1115 }
1116
1117 =head2 GetImportBatchNoMatchAction
1118
1119   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1120
1121 =cut
1122
1123 sub GetImportBatchNoMatchAction {
1124     my ($batch_id) = @_;
1125
1126     my $dbh = C4::Context->dbh;
1127     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1128     $sth->execute($batch_id);
1129     my ($nomatch_action) = $sth->fetchrow_array();
1130     $sth->finish();
1131     return $nomatch_action;
1132
1133 }
1134
1135
1136 =head2 SetImportBatchNoMatchAction
1137
1138   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1139
1140 =cut
1141
1142 sub SetImportBatchNoMatchAction {
1143     my ($batch_id, $new_nomatch_action) = @_;
1144
1145     my $dbh = C4::Context->dbh;
1146     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1147     $sth->execute($new_nomatch_action, $batch_id);
1148     $sth->finish();
1149
1150 }
1151
1152 =head2 GetImportBatchItemAction
1153
1154   my $item_action = GetImportBatchItemAction($batch_id);
1155
1156 =cut
1157
1158 sub GetImportBatchItemAction {
1159     my ($batch_id) = @_;
1160
1161     my $dbh = C4::Context->dbh;
1162     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1163     $sth->execute($batch_id);
1164     my ($item_action) = $sth->fetchrow_array();
1165     $sth->finish();
1166     return $item_action;
1167
1168 }
1169
1170
1171 =head2 SetImportBatchItemAction
1172
1173   SetImportBatchItemAction($batch_id, $new_item_action);
1174
1175 =cut
1176
1177 sub SetImportBatchItemAction {
1178     my ($batch_id, $new_item_action) = @_;
1179
1180     my $dbh = C4::Context->dbh;
1181     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1182     $sth->execute($new_item_action, $batch_id);
1183     $sth->finish();
1184
1185 }
1186
1187 =head2 GetImportBatchMatcher
1188
1189   my $matcher_id = GetImportBatchMatcher($batch_id);
1190
1191 =cut
1192
1193 sub GetImportBatchMatcher {
1194     my ($batch_id) = @_;
1195
1196     my $dbh = C4::Context->dbh;
1197     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1198     $sth->execute($batch_id);
1199     my ($matcher_id) = $sth->fetchrow_array();
1200     $sth->finish();
1201     return $matcher_id;
1202
1203 }
1204
1205
1206 =head2 SetImportBatchMatcher
1207
1208   SetImportBatchMatcher($batch_id, $new_matcher_id);
1209
1210 =cut
1211
1212 sub SetImportBatchMatcher {
1213     my ($batch_id, $new_matcher_id) = @_;
1214
1215     my $dbh = C4::Context->dbh;
1216     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1217     $sth->execute($new_matcher_id, $batch_id);
1218     $sth->finish();
1219
1220 }
1221
1222 =head2 GetImportRecordOverlayStatus
1223
1224   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1225
1226 =cut
1227
1228 sub GetImportRecordOverlayStatus {
1229     my ($import_record_id) = @_;
1230
1231     my $dbh = C4::Context->dbh;
1232     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1233     $sth->execute($import_record_id);
1234     my ($overlay_status) = $sth->fetchrow_array();
1235     $sth->finish();
1236     return $overlay_status;
1237
1238 }
1239
1240
1241 =head2 SetImportRecordOverlayStatus
1242
1243   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1244
1245 =cut
1246
1247 sub SetImportRecordOverlayStatus {
1248     my ($import_record_id, $new_overlay_status) = @_;
1249
1250     my $dbh = C4::Context->dbh;
1251     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1252     $sth->execute($new_overlay_status, $import_record_id);
1253     $sth->finish();
1254
1255 }
1256
1257 =head2 GetImportRecordStatus
1258
1259   my $overlay_status = GetImportRecordStatus($import_record_id);
1260
1261 =cut
1262
1263 sub GetImportRecordStatus {
1264     my ($import_record_id) = @_;
1265
1266     my $dbh = C4::Context->dbh;
1267     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1268     $sth->execute($import_record_id);
1269     my ($overlay_status) = $sth->fetchrow_array();
1270     $sth->finish();
1271     return $overlay_status;
1272
1273 }
1274
1275
1276 =head2 SetImportRecordStatus
1277
1278   SetImportRecordStatus($import_record_id, $new_overlay_status);
1279
1280 =cut
1281
1282 sub SetImportRecordStatus {
1283     my ($import_record_id, $new_overlay_status) = @_;
1284
1285     my $dbh = C4::Context->dbh;
1286     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1287     $sth->execute($new_overlay_status, $import_record_id);
1288     $sth->finish();
1289
1290 }
1291
1292 =head2 GetImportRecordMatches
1293
1294   my $results = GetImportRecordMatches($import_record_id, $best_only);
1295
1296 =cut
1297
1298 sub GetImportRecordMatches {
1299     my $import_record_id = shift;
1300     my $best_only = @_ ? shift : 0;
1301
1302     my $dbh = C4::Context->dbh;
1303     # FIXME currently biblio only
1304     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1305                                     candidate_match_id, score, record_type
1306                                     FROM import_records
1307                                     JOIN import_record_matches USING (import_record_id)
1308                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1309                                     WHERE import_record_id = ?
1310                                     ORDER BY score DESC, biblionumber DESC");
1311     $sth->bind_param(1, $import_record_id);
1312     my $results = [];
1313     $sth->execute();
1314     while (my $row = $sth->fetchrow_hashref) {
1315         if ($row->{'record_type'} eq 'auth') {
1316             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1317         }
1318         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1319         push @$results, $row;
1320         last if $best_only;
1321     }
1322     $sth->finish();
1323
1324     return $results;
1325     
1326 }
1327
1328
1329 =head2 SetImportRecordMatches
1330
1331   SetImportRecordMatches($import_record_id, @matches);
1332
1333 =cut
1334
1335 sub SetImportRecordMatches {
1336     my $import_record_id = shift;
1337     my @matches = @_;
1338
1339     my $dbh = C4::Context->dbh;
1340     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1341     $delsth->execute($import_record_id);
1342     $delsth->finish();
1343
1344     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1345                                     VALUES (?, ?, ?)");
1346     foreach my $match (@matches) {
1347         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1348     }
1349 }
1350
1351
1352 # internal functions
1353
1354 sub _create_import_record {
1355     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1356
1357     my $dbh = C4::Context->dbh;
1358     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1359                                                          record_type, encoding, z3950random)
1360                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1361     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1362                   $record_type, $encoding, $z3950random);
1363     my $import_record_id = $dbh->{'mysql_insertid'};
1364     $sth->finish();
1365     return $import_record_id;
1366 }
1367
1368 sub _update_import_record_marc {
1369     my ($import_record_id, $marc_record, $marc_type) = @_;
1370
1371     my $dbh = C4::Context->dbh;
1372     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1373                              WHERE  import_record_id = ?");
1374     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1375     $sth->finish();
1376 }
1377
1378 sub _add_auth_fields {
1379     my ($import_record_id, $marc_record) = @_;
1380
1381     my $controlnumber;
1382     if ($marc_record->field('001')) {
1383         $controlnumber = $marc_record->field('001')->data();
1384     }
1385     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1386     my $dbh = C4::Context->dbh;
1387     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1388     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1389     $sth->finish();
1390 }
1391
1392 sub _add_biblio_fields {
1393     my ($import_record_id, $marc_record) = @_;
1394
1395     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1396     my $dbh = C4::Context->dbh;
1397     # FIXME no controlnumber, originalsource
1398     $isbn = C4::Koha::_isbn_cleanup($isbn); # FIXME C4::Koha::_isbn_cleanup should be made public
1399     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1400     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1401     $sth->finish();
1402                 
1403 }
1404
1405 sub _update_biblio_fields {
1406     my ($import_record_id, $marc_record) = @_;
1407
1408     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1409     my $dbh = C4::Context->dbh;
1410     # FIXME no controlnumber, originalsource
1411     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1412     $isbn =~ s/\(.*$//;
1413     $isbn =~ tr/ -_//;
1414     $isbn = uc $isbn;
1415     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1416                              WHERE  import_record_id = ?");
1417     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1418     $sth->finish();
1419 }
1420
1421 sub _parse_biblio_fields {
1422     my ($marc_record) = @_;
1423
1424     my $dbh = C4::Context->dbh;
1425     my $bibliofields = TransformMarcToKoha($dbh, $marc_record, '');
1426     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1427
1428 }
1429
1430 sub _update_batch_record_counts {
1431     my ($batch_id) = @_;
1432
1433     my $dbh = C4::Context->dbh;
1434     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1435                                         num_records = (
1436                                             SELECT COUNT(*)
1437                                             FROM import_records
1438                                             WHERE import_batch_id = import_batches.import_batch_id),
1439                                         num_items = (
1440                                             SELECT COUNT(*)
1441                                             FROM import_records
1442                                             JOIN import_items USING (import_record_id)
1443                                             WHERE import_batch_id = import_batches.import_batch_id
1444                                             AND record_type = 'biblio')
1445                                     WHERE import_batch_id = ?");
1446     $sth->bind_param(1, $batch_id);
1447     $sth->execute();
1448     $sth->finish();
1449 }
1450
1451 sub _get_commit_action {
1452     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1453     
1454     if ($record_type eq 'biblio') {
1455         my ($bib_result, $bib_match, $item_result);
1456
1457         if ($overlay_status ne 'no_match') {
1458             $bib_match = GetBestRecordMatch($import_record_id);
1459             if ($overlay_action eq 'replace') {
1460                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1461             } elsif ($overlay_action eq 'create_new') {
1462                 $bib_result  = 'create_new';
1463             } elsif ($overlay_action eq 'ignore') {
1464                 $bib_result  = 'ignore';
1465             }
1466             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_matches') ? 'create_new' : 'ignore';
1467         } else {
1468             $bib_result = $nomatch_action;
1469             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1470         }
1471         return ($bib_result, $item_result, $bib_match);
1472     } else { # must be auths
1473         my ($auth_result, $auth_match);
1474
1475         if ($overlay_status ne 'no_match') {
1476             $auth_match = GetBestRecordMatch($import_record_id);
1477             if ($overlay_action eq 'replace') {
1478                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1479             } elsif ($overlay_action eq 'create_new') {
1480                 $auth_result  = 'create_new';
1481             } elsif ($overlay_action eq 'ignore') {
1482                 $auth_result  = 'ignore';
1483             }
1484         } else {
1485             $auth_result = $nomatch_action;
1486         }
1487
1488         return ($auth_result, undef, $auth_match);
1489
1490     }
1491 }
1492
1493 sub _get_revert_action {
1494     my ($overlay_action, $overlay_status, $status) = @_;
1495
1496     my $bib_result;
1497
1498     if ($status eq 'ignored') {
1499         $bib_result = 'ignore';
1500     } else {
1501         if ($overlay_action eq 'create_new') {
1502             $bib_result = 'delete';
1503         } else {
1504             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1505         }
1506     }
1507     return $bib_result;
1508 }
1509
1510 1;
1511 __END__
1512
1513 =head1 AUTHOR
1514
1515 Koha Development Team <http://koha-community.org/>
1516
1517 Galen Charlton <galen.charlton@liblime.com>
1518
1519 =cut