Bug 18260: Koha::Biblio - Remove GetBiblio
[koha.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Plugins::Handler;
31 use Koha::Logger;
32
33 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
34
35 BEGIN {
36         require Exporter;
37         @ISA    = qw(Exporter);
38         @EXPORT = qw(
39     GetZ3950BatchId
40     GetWebserviceBatchId
41     GetImportRecordMarc
42     GetImportRecordMarcXML
43     AddImportBatch
44     GetImportBatch
45     AddAuthToBatch
46     AddBiblioToBatch
47     AddItemsToImportBiblio
48     ModAuthorityInBatch
49     ModBiblioInBatch
50
51     BatchStageMarcRecords
52     BatchFindDuplicates
53     BatchCommitRecords
54     BatchRevertRecords
55     CleanBatch
56     DeleteBatch
57
58     GetAllImportBatches
59     GetStagedWebserviceBatches
60     GetImportBatchRangeDesc
61     GetNumberOfNonZ3950ImportBatches
62     GetImportBiblios
63     GetImportRecordsRange
64         GetItemNumbersFromImportBatch
65     
66     GetImportBatchStatus
67     SetImportBatchStatus
68     GetImportBatchOverlayAction
69     SetImportBatchOverlayAction
70     GetImportBatchNoMatchAction
71     SetImportBatchNoMatchAction
72     GetImportBatchItemAction
73     SetImportBatchItemAction
74     GetImportBatchMatcher
75     SetImportBatchMatcher
76     GetImportRecordOverlayStatus
77     SetImportRecordOverlayStatus
78     GetImportRecordStatus
79     SetImportRecordStatus
80     GetImportRecordMatches
81     SetImportRecordMatches
82         );
83 }
84
85 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
86
87 =head1 NAME
88
89 C4::ImportBatch - manage batches of imported MARC records
90
91 =head1 SYNOPSIS
92
93 use C4::ImportBatch;
94
95 =head1 FUNCTIONS
96
97 =head2 GetZ3950BatchId
98
99   my $batchid = GetZ3950BatchId($z3950server);
100
101 Retrieves the ID of the import batch for the Z39.50
102 reservoir for the given target.  If necessary,
103 creates the import batch.
104
105 =cut
106
107 sub GetZ3950BatchId {
108     my ($z3950server) = @_;
109
110     my $dbh = C4::Context->dbh;
111     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
112                              WHERE  batch_type = 'z3950'
113                              AND    file_name = ?");
114     $sth->execute($z3950server);
115     my $rowref = $sth->fetchrow_arrayref();
116     $sth->finish();
117     if (defined $rowref) {
118         return $rowref->[0];
119     } else {
120         my $batch_id = AddImportBatch( {
121                 overlay_action => 'create_new',
122                 import_status => 'staged',
123                 batch_type => 'z3950',
124                 file_name => $z3950server,
125             } );
126         return $batch_id;
127     }
128     
129 }
130
131 =head2 GetWebserviceBatchId
132
133   my $batchid = GetWebserviceBatchId();
134
135 Retrieves the ID of the import batch for webservice.
136 If necessary, creates the import batch.
137
138 =cut
139
140 my $WEBSERVICE_BASE_QRY = <<EOQ;
141 SELECT import_batch_id FROM import_batches
142 WHERE  batch_type = 'webservice'
143 AND    import_status = 'staged'
144 EOQ
145 sub GetWebserviceBatchId {
146     my ($params) = @_;
147
148     my $dbh = C4::Context->dbh;
149     my $sql = $WEBSERVICE_BASE_QRY;
150     my @args;
151     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
152         if (my $val = $params->{$field}) {
153             $sql .= " AND $field = ?";
154             push @args, $val;
155         }
156     }
157     my $id = $dbh->selectrow_array($sql, undef, @args);
158     return $id if $id;
159
160     $params->{batch_type} = 'webservice';
161     $params->{import_status} = 'staged';
162     return AddImportBatch($params);
163 }
164
165 =head2 GetImportRecordMarc
166
167   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
168
169 =cut
170
171 sub GetImportRecordMarc {
172     my ($import_record_id) = @_;
173
174     my $dbh = C4::Context->dbh;
175     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
176         SELECT marc, encoding
177         FROM import_records
178         WHERE import_record_id = ?
179     |, undef, $import_record_id );
180
181     return $marc, $encoding;
182 }
183
184 sub GetRecordFromImportBiblio {
185     my ( $import_record_id, $embed_items ) = @_;
186
187     my ($marc) = GetImportRecordMarc($import_record_id);
188     my $record = MARC::Record->new_from_usmarc($marc);
189
190     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
191
192     return $record;
193 }
194
195 sub EmbedItemsInImportBiblio {
196     my ( $record, $import_record_id ) = @_;
197     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
198     my $dbh = C4::Context->dbh;
199     my $import_items = $dbh->selectall_arrayref(q|
200         SELECT import_items.marcxml
201         FROM import_items
202         WHERE import_record_id = ?
203     |, { Slice => {} }, $import_record_id );
204     my @item_fields;
205     for my $import_item ( @$import_items ) {
206         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
207         push @item_fields, $item_marc->field($itemtag);
208     }
209     $record->append_fields(@item_fields);
210     return $record;
211 }
212
213 =head2 GetImportRecordMarcXML
214
215   my $marcxml = GetImportRecordMarcXML($import_record_id);
216
217 =cut
218
219 sub GetImportRecordMarcXML {
220     my ($import_record_id) = @_;
221
222     my $dbh = C4::Context->dbh;
223     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
224     $sth->execute($import_record_id);
225     my ($marcxml) = $sth->fetchrow();
226     $sth->finish();
227     return $marcxml;
228
229 }
230
231 =head2 AddImportBatch
232
233   my $batch_id = AddImportBatch($params_hash);
234
235 =cut
236
237 sub AddImportBatch {
238     my ($params) = @_;
239
240     my (@fields, @vals);
241     foreach (qw( matcher_id template_id branchcode
242                  overlay_action nomatch_action item_action
243                  import_status batch_type file_name comments record_type )) {
244         if (exists $params->{$_}) {
245             push @fields, $_;
246             push @vals, $params->{$_};
247         }
248     }
249     my $dbh = C4::Context->dbh;
250     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
251                                   VALUES (".join( ',', map '?', @fields).")",
252              undef,
253              @vals);
254     return $dbh->{'mysql_insertid'};
255 }
256
257 =head2 GetImportBatch 
258
259   my $row = GetImportBatch($batch_id);
260
261 Retrieve a hashref of an import_batches row.
262
263 =cut
264
265 sub GetImportBatch {
266     my ($batch_id) = @_;
267
268     my $dbh = C4::Context->dbh;
269     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
270     $sth->bind_param(1, $batch_id);
271     $sth->execute();
272     my $result = $sth->fetchrow_hashref;
273     $sth->finish();
274     return $result;
275
276 }
277
278 =head2 AddBiblioToBatch 
279
280   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
281                 $marc_record, $encoding, $z3950random, $update_counts);
282
283 =cut
284
285 sub AddBiblioToBatch {
286     my $batch_id = shift;
287     my $record_sequence = shift;
288     my $marc_record = shift;
289     my $encoding = shift;
290     my $z3950random = shift;
291     my $update_counts = @_ ? shift : 1;
292
293     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
294     _add_biblio_fields($import_record_id, $marc_record);
295     _update_batch_record_counts($batch_id) if $update_counts;
296     return $import_record_id;
297 }
298
299 =head2 ModBiblioInBatch
300
301   ModBiblioInBatch($import_record_id, $marc_record);
302
303 =cut
304
305 sub ModBiblioInBatch {
306     my ($import_record_id, $marc_record) = @_;
307
308     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
309     _update_biblio_fields($import_record_id, $marc_record);
310
311 }
312
313 =head2 AddAuthToBatch
314
315   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
316                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
317
318 =cut
319
320 sub AddAuthToBatch {
321     my $batch_id = shift;
322     my $record_sequence = shift;
323     my $marc_record = shift;
324     my $encoding = shift;
325     my $z3950random = shift;
326     my $update_counts = @_ ? shift : 1;
327     my $marc_type = shift || C4::Context->preference('marcflavour');
328
329     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
330
331     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
332     _add_auth_fields($import_record_id, $marc_record);
333     _update_batch_record_counts($batch_id) if $update_counts;
334     return $import_record_id;
335 }
336
337 =head2 ModAuthInBatch
338
339   ModAuthInBatch($import_record_id, $marc_record);
340
341 =cut
342
343 sub ModAuthInBatch {
344     my ($import_record_id, $marc_record) = @_;
345
346     my $marcflavour = C4::Context->preference('marcflavour');
347     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
348
349 }
350
351 =head2 BatchStageMarcRecords
352
353 ( $batch_id, $num_records, $num_items, @invalid_records ) =
354   BatchStageMarcRecords(
355     $encoding,                   $marc_records,
356     $file_name,                  $to_marc_plugin,
357     $marc_modification_template, $comments,
358     $branch_code,                $parse_items,
359     $leave_as_staging,           $progress_interval,
360     $progress_callback
361   );
362
363 =cut
364
365 sub BatchStageMarcRecords {
366     my $record_type = shift;
367     my $encoding = shift;
368     my $marc_records = shift;
369     my $file_name = shift;
370     my $to_marc_plugin = shift;
371     my $marc_modification_template = shift;
372     my $comments = shift;
373     my $branch_code = shift;
374     my $parse_items = shift;
375     my $leave_as_staging = shift;
376
377     # optional callback to monitor status 
378     # of job
379     my $progress_interval = 0;
380     my $progress_callback = undef;
381     if ($#_ == 1) {
382         $progress_interval = shift;
383         $progress_callback = shift;
384         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
385         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
386     } 
387     
388     my $batch_id = AddImportBatch( {
389             overlay_action => 'create_new',
390             import_status => 'staging',
391             batch_type => 'batch',
392             file_name => $file_name,
393             comments => $comments,
394             record_type => $record_type,
395         } );
396     if ($parse_items) {
397         SetImportBatchItemAction($batch_id, 'always_add');
398     } else {
399         SetImportBatchItemAction($batch_id, 'ignore');
400     }
401
402     $marc_records = Koha::Plugins::Handler->run(
403         {
404             class  => $to_marc_plugin,
405             method => 'to_marc',
406             params => { data => $marc_records }
407         }
408     ) if $to_marc_plugin && @$marc_records;
409
410     my $marc_type = C4::Context->preference('marcflavour');
411     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
412     my @invalid_records = ();
413     my $num_valid = 0;
414     my $num_items = 0;
415     # FIXME - for now, we're dealing only with bibs
416     my $rec_num = 0;
417     foreach my $marc_record (@$marc_records) {
418         $rec_num++;
419         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
420             &$progress_callback($rec_num);
421         }
422
423         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
424
425         my $import_record_id;
426         if (scalar($marc_record->fields()) == 0) {
427             push @invalid_records, $marc_record;
428         } else {
429
430             # Normalize the record so it doesn't have separated diacritics
431             SetUTF8Flag($marc_record);
432
433             $num_valid++;
434             if ($record_type eq 'biblio') {
435                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
436                 if ($parse_items) {
437                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
438                     $num_items += scalar(@import_items_ids);
439                 }
440             } elsif ($record_type eq 'auth') {
441                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
442             }
443         }
444     }
445     unless ($leave_as_staging) {
446         SetImportBatchStatus($batch_id, 'staged');
447     }
448     # FIXME branch_code, number of bibs, number of items
449     _update_batch_record_counts($batch_id);
450     return ($batch_id, $num_valid, $num_items, @invalid_records);
451 }
452
453 =head2 AddItemsToImportBiblio
454
455   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
456                 $import_record_id, $marc_record, $update_counts);
457
458 =cut
459
460 sub AddItemsToImportBiblio {
461     my $batch_id = shift;
462     my $import_record_id = shift;
463     my $marc_record = shift;
464     my $update_counts = @_ ? shift : 0;
465
466     my @import_items_ids = ();
467    
468     my $dbh = C4::Context->dbh; 
469     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
470     foreach my $item_field ($marc_record->field($item_tag)) {
471         my $item_marc = MARC::Record->new();
472         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
473         $item_marc->append_fields($item_field);
474         $marc_record->delete_field($item_field);
475         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
476                                         VALUES (?, ?, ?)");
477         $sth->bind_param(1, $import_record_id);
478         $sth->bind_param(2, 'staged');
479         $sth->bind_param(3, $item_marc->as_xml());
480         $sth->execute();
481         push @import_items_ids, $dbh->{'mysql_insertid'};
482         $sth->finish();
483     }
484
485     if ($#import_items_ids > -1) {
486         _update_batch_record_counts($batch_id) if $update_counts;
487         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
488     }
489     return @import_items_ids;
490 }
491
492 =head2 BatchFindDuplicates
493
494   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
495              $max_matches, $progress_interval, $progress_callback);
496
497 Goes through the records loaded in the batch and attempts to 
498 find duplicates for each one.  Sets the matching status 
499 of each record to "no_match" or "auto_match" as appropriate.
500
501 The $max_matches parameter is optional; if it is not supplied,
502 it defaults to 10.
503
504 The $progress_interval and $progress_callback parameters are 
505 optional; if both are supplied, the sub referred to by
506 $progress_callback will be invoked every $progress_interval
507 records using the number of records processed as the 
508 singular argument.
509
510 =cut
511
512 sub BatchFindDuplicates {
513     my $batch_id = shift;
514     my $matcher = shift;
515     my $max_matches = @_ ? shift : 10;
516
517     # optional callback to monitor status 
518     # of job
519     my $progress_interval = 0;
520     my $progress_callback = undef;
521     if ($#_ == 1) {
522         $progress_interval = shift;
523         $progress_callback = shift;
524         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
525         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
526     }
527
528     my $dbh = C4::Context->dbh;
529
530     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
531                              FROM import_records
532                              WHERE import_batch_id = ?");
533     $sth->execute($batch_id);
534     my $num_with_matches = 0;
535     my $rec_num = 0;
536     while (my $rowref = $sth->fetchrow_hashref) {
537         $rec_num++;
538         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
539             &$progress_callback($rec_num);
540         }
541         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
542         my @matches = ();
543         if (defined $matcher) {
544             @matches = $matcher->get_matches($marc_record, $max_matches);
545         }
546         if (scalar(@matches) > 0) {
547             $num_with_matches++;
548             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
549             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
550         } else {
551             SetImportRecordMatches($rowref->{'import_record_id'}, ());
552             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
553         }
554     }
555     $sth->finish();
556     return $num_with_matches;
557 }
558
559 =head2 BatchCommitRecords
560
561   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
562         BatchCommitRecords($batch_id, $framework,
563         $progress_interval, $progress_callback);
564
565 =cut
566
567 sub BatchCommitRecords {
568     my $batch_id = shift;
569     my $framework = shift;
570
571     # optional callback to monitor status 
572     # of job
573     my $progress_interval = 0;
574     my $progress_callback = undef;
575     if ($#_ == 1) {
576         $progress_interval = shift;
577         $progress_callback = shift;
578         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
579         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
580     }
581
582     my $record_type;
583     my $num_added = 0;
584     my $num_updated = 0;
585     my $num_items_added = 0;
586     my $num_items_replaced = 0;
587     my $num_items_errored = 0;
588     my $num_ignored = 0;
589     # commit (i.e., save, all records in the batch)
590     SetImportBatchStatus('importing');
591     my $overlay_action = GetImportBatchOverlayAction($batch_id);
592     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
593     my $item_action = GetImportBatchItemAction($batch_id);
594     my $item_tag;
595     my $item_subfield;
596     my $dbh = C4::Context->dbh;
597     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
598                              FROM import_records
599                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
600                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
601                              WHERE import_batch_id = ?");
602     $sth->execute($batch_id);
603     my $marcflavour = C4::Context->preference('marcflavour');
604     my $rec_num = 0;
605     while (my $rowref = $sth->fetchrow_hashref) {
606         $record_type = $rowref->{'record_type'};
607         $rec_num++;
608         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
609             &$progress_callback($rec_num);
610         }
611         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
612             $num_ignored++;
613             next;
614         }
615
616         my $marc_type;
617         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
618             $marc_type = 'UNIMARCAUTH';
619         } elsif ($marcflavour eq 'UNIMARC') {
620             $marc_type = 'UNIMARC';
621         } else {
622             $marc_type = 'USMARC';
623         }
624         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
625
626         if ($record_type eq 'biblio') {
627             # remove any item tags - rely on BatchCommitItems
628             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
629             foreach my $item_field ($marc_record->field($item_tag)) {
630                 $marc_record->delete_field($item_field);
631             }
632         }
633
634         my ($record_result, $item_result, $record_match) =
635             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
636                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
637
638         my $recordid;
639         my $query;
640         if ($record_result eq 'create_new') {
641             $num_added++;
642             if ($record_type eq 'biblio') {
643                 my $biblioitemnumber;
644                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
645                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
646                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
647                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
648                     $num_items_added += $bib_items_added;
649                     $num_items_replaced += $bib_items_replaced;
650                     $num_items_errored += $bib_items_errored;
651                 }
652             } else {
653                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
654                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
655             }
656             my $sth = $dbh->prepare_cached($query);
657             $sth->execute($recordid, $rowref->{'import_record_id'});
658             $sth->finish();
659             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
660         } elsif ($record_result eq 'replace') {
661             $num_updated++;
662             $recordid = $record_match;
663             my $oldxml;
664             if ($record_type eq 'biblio') {
665                 my $oldbiblio = Koha::Biblios->find( $recordid );
666                 $oldxml = GetXmlBiblio($recordid);
667
668                 # remove item fields so that they don't get
669                 # added again if record is reverted
670                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
671                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
672                 foreach my $item_field ($old_marc->field($item_tag)) {
673                     $old_marc->delete_field($item_field);
674                 }
675                 $oldxml = $old_marc->as_xml($marc_type);
676
677                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
678                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
679
680                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
681                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
682                     $num_items_added += $bib_items_added;
683                     $num_items_replaced += $bib_items_replaced;
684                     $num_items_errored += $bib_items_errored;
685                 }
686             } else {
687                 $oldxml = GetAuthorityXML($recordid);
688
689                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
690                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
691             }
692             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
693             $sth->execute($oldxml, $rowref->{'import_record_id'});
694             $sth->finish();
695             my $sth2 = $dbh->prepare_cached($query);
696             $sth2->execute($recordid, $rowref->{'import_record_id'});
697             $sth2->finish();
698             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
699             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
700         } elsif ($record_result eq 'ignore') {
701             $recordid = $record_match;
702             $num_ignored++;
703             $recordid = $record_match;
704             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
705                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
706                 $num_items_added += $bib_items_added;
707          $num_items_replaced += $bib_items_replaced;
708                 $num_items_errored += $bib_items_errored;
709                 # still need to record the matched biblionumber so that the
710                 # items can be reverted
711                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
712                 $sth2->execute($recordid, $rowref->{'import_record_id'});
713                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
714             }
715             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
716         }
717     }
718     $sth->finish();
719     SetImportBatchStatus($batch_id, 'imported');
720     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
721 }
722
723 =head2 BatchCommitItems
724
725   ($num_items_added, $num_items_errored) = 
726          BatchCommitItems($import_record_id, $biblionumber);
727
728 =cut
729
730 sub BatchCommitItems {
731     my ( $import_record_id, $biblionumber, $action ) = @_;
732
733     my $dbh = C4::Context->dbh;
734
735     my $num_items_added = 0;
736     my $num_items_errored = 0;
737     my $num_items_replaced = 0;
738
739     my $sth = $dbh->prepare( "
740         SELECT import_items_id, import_items.marcxml, encoding
741         FROM import_items
742         JOIN import_records USING (import_record_id)
743         WHERE import_record_id = ?
744         ORDER BY import_items_id
745     " );
746     $sth->bind_param( 1, $import_record_id );
747     $sth->execute();
748
749     while ( my $row = $sth->fetchrow_hashref() ) {
750         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
751
752         # Delete date_due subfield as to not accidentally delete item checkout due dates
753         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
754         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
755
756         my $item = TransformMarcToKoha( $item_marc );
757
758         my $duplicate_barcode = exists( $item->{'barcode'} ) && GetItemnumberFromBarcode( $item->{'barcode'} );
759         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
760
761         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
762         if ( $action eq "replace" && $duplicate_itemnumber ) {
763             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
764             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
765             $updsth->bind_param( 1, 'imported' );
766             $updsth->bind_param( 2, $item->{itemnumber} );
767             $updsth->bind_param( 3, $row->{'import_items_id'} );
768             $updsth->execute();
769             $updsth->finish();
770             $num_items_replaced++;
771         } elsif ( $action eq "replace" && $duplicate_barcode ) {
772             my $itemnumber = GetItemnumberFromBarcode( $item->{'barcode'} );
773             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
774             $updsth->bind_param( 1, 'imported' );
775             $updsth->bind_param( 2, $item->{itemnumber} );
776             $updsth->bind_param( 3, $row->{'import_items_id'} );
777             $updsth->execute();
778             $updsth->finish();
779             $num_items_replaced++;
780         } elsif ($duplicate_barcode) {
781             $updsth->bind_param( 1, 'error' );
782             $updsth->bind_param( 2, 'duplicate item barcode' );
783             $updsth->bind_param( 3, $row->{'import_items_id'} );
784             $updsth->execute();
785             $num_items_errored++;
786         } else {
787             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
788             if( $itemnumber ) {
789                 $updsth->bind_param( 1, 'imported' );
790                 $updsth->bind_param( 2, $itemnumber );
791                 $updsth->bind_param( 3, $row->{'import_items_id'} );
792                 $updsth->execute();
793                 $updsth->finish();
794                 $num_items_added++;
795             }
796         }
797     }
798
799     return ( $num_items_added, $num_items_replaced, $num_items_errored );
800 }
801
802 =head2 BatchRevertRecords
803
804   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
805       $num_ignored) = BatchRevertRecords($batch_id);
806
807 =cut
808
809 sub BatchRevertRecords {
810     my $batch_id = shift;
811
812     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
813
814     my $record_type;
815     my $num_deleted = 0;
816     my $num_errors = 0;
817     my $num_reverted = 0;
818     my $num_ignored = 0;
819     my $num_items_deleted = 0;
820     # commit (i.e., save, all records in the batch)
821     SetImportBatchStatus('reverting');
822     my $overlay_action = GetImportBatchOverlayAction($batch_id);
823     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
824     my $dbh = C4::Context->dbh;
825     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
826                              FROM import_records
827                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
828                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
829                              WHERE import_batch_id = ?");
830     $sth->execute($batch_id);
831     my $marc_type;
832     my $marcflavour = C4::Context->preference('marcflavour');
833     while (my $rowref = $sth->fetchrow_hashref) {
834         $record_type = $rowref->{'record_type'};
835         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
836             $num_ignored++;
837             next;
838         }
839         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
840             $marc_type = 'UNIMARCAUTH';
841         } elsif ($marcflavour eq 'UNIMARC') {
842             $marc_type = 'UNIMARC';
843         } else {
844             $marc_type = 'USMARC';
845         }
846
847         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
848
849         if ($record_result eq 'delete') {
850             my $error = undef;
851             if  ($record_type eq 'biblio') {
852                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
853                 $error = DelBiblio($rowref->{'matched_biblionumber'});
854             } else {
855                 DelAuthority({ authid => $rowref->{'matched_authid'} });
856             }
857             if (defined $error) {
858                 $num_errors++;
859             } else {
860                 $num_deleted++;
861                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
862             }
863         } elsif ($record_result eq 'restore') {
864             $num_reverted++;
865             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
866             if ($record_type eq 'biblio') {
867                 my $biblionumber = $rowref->{'matched_biblionumber'};
868                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
869
870                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
871                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
872
873                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
874                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
875             } else {
876                 my $authid = $rowref->{'matched_authid'};
877                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
878             }
879             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
880         } elsif ($record_result eq 'ignore') {
881             if ($record_type eq 'biblio') {
882                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
883             }
884             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
885         }
886         my $query;
887         if ($record_type eq 'biblio') {
888             # remove matched_biblionumber only if there is no 'imported' item left
889             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
890             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
891         } else {
892             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
893         }
894         my $sth2 = $dbh->prepare_cached($query);
895         $sth2->execute($rowref->{'import_record_id'});
896     }
897
898     $sth->finish();
899     SetImportBatchStatus($batch_id, 'reverted');
900     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
901 }
902
903 =head2 BatchRevertItems
904
905   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
906
907 =cut
908
909 sub BatchRevertItems {
910     my ($import_record_id, $biblionumber) = @_;
911
912     my $dbh = C4::Context->dbh;
913     my $num_items_deleted = 0;
914
915     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
916                                    FROM import_items
917                                    JOIN items USING (itemnumber)
918                                    WHERE import_record_id = ?");
919     $sth->bind_param(1, $import_record_id);
920     $sth->execute();
921     while (my $row = $sth->fetchrow_hashref()) {
922         my $error = DelItemCheck( $biblionumber, $row->{'itemnumber'});
923         if ($error == 1){
924             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
925             $updsth->bind_param(1, 'reverted');
926             $updsth->bind_param(2, $row->{'import_items_id'});
927             $updsth->execute();
928             $updsth->finish();
929             $num_items_deleted++;
930         }
931         else {
932             next;
933         }
934     }
935     $sth->finish();
936     return $num_items_deleted;
937 }
938
939 =head2 CleanBatch
940
941   CleanBatch($batch_id)
942
943 Deletes all staged records from the import batch
944 and sets the status of the batch to 'cleaned'.  Note
945 that deleting a stage record does *not* affect
946 any record that has been committed to the database.
947
948 =cut
949
950 sub CleanBatch {
951     my $batch_id = shift;
952     return unless defined $batch_id;
953
954     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
955     SetImportBatchStatus($batch_id, 'cleaned');
956 }
957
958 =head2 DeleteBatch
959
960   DeleteBatch($batch_id)
961
962 Deletes the record from the database. This can only be done
963 once the batch has been cleaned.
964
965 =cut
966
967 sub DeleteBatch {
968     my $batch_id = shift;
969     return unless defined $batch_id;
970
971     my $dbh = C4::Context->dbh;
972     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
973     $sth->execute( $batch_id );
974 }
975
976 =head2 GetAllImportBatches
977
978   my $results = GetAllImportBatches();
979
980 Returns a references to an array of hash references corresponding
981 to all import_batches rows (of batch_type 'batch'), sorted in 
982 ascending order by import_batch_id.
983
984 =cut
985
986 sub  GetAllImportBatches {
987     my $dbh = C4::Context->dbh;
988     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
989                                     WHERE batch_type IN ('batch', 'webservice')
990                                     ORDER BY import_batch_id ASC");
991
992     my $results = [];
993     $sth->execute();
994     while (my $row = $sth->fetchrow_hashref) {
995         push @$results, $row;
996     }
997     $sth->finish();
998     return $results;
999 }
1000
1001 =head2 GetStagedWebserviceBatches
1002
1003   my $batch_ids = GetStagedWebserviceBatches();
1004
1005 Returns a references to an array of batch id's
1006 of batch_type 'webservice' that are not imported
1007
1008 =cut
1009
1010 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1011 SELECT import_batch_id FROM import_batches
1012 WHERE batch_type = 'webservice'
1013 AND import_status = 'staged'
1014 EOQ
1015 sub  GetStagedWebserviceBatches {
1016     my $dbh = C4::Context->dbh;
1017     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1018 }
1019
1020 =head2 GetImportBatchRangeDesc
1021
1022   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1023
1024 Returns a reference to an array of hash references corresponding to
1025 import_batches rows (sorted in descending order by import_batch_id)
1026 start at the given offset.
1027
1028 =cut
1029
1030 sub GetImportBatchRangeDesc {
1031     my ($offset, $results_per_group) = @_;
1032
1033     my $dbh = C4::Context->dbh;
1034     my $query = "SELECT * FROM import_batches
1035                                     WHERE batch_type IN ('batch', 'webservice')
1036                                     ORDER BY import_batch_id DESC";
1037     my @params;
1038     if ($results_per_group){
1039         $query .= " LIMIT ?";
1040         push(@params, $results_per_group);
1041     }
1042     if ($offset){
1043         $query .= " OFFSET ?";
1044         push(@params, $offset);
1045     }
1046     my $sth = $dbh->prepare_cached($query);
1047     $sth->execute(@params);
1048     my $results = $sth->fetchall_arrayref({});
1049     $sth->finish();
1050     return $results;
1051 }
1052
1053 =head2 GetItemNumbersFromImportBatch
1054
1055   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1056
1057 =cut
1058
1059 sub GetItemNumbersFromImportBatch {
1060     my ($batch_id) = @_;
1061     my $dbh = C4::Context->dbh;
1062     my $sql = q|
1063 SELECT itemnumber FROM import_items
1064 INNER JOIN items USING (itemnumber)
1065 INNER JOIN import_records USING (import_record_id)
1066 WHERE import_batch_id = ?|;
1067     my  $sth = $dbh->prepare( $sql );
1068     $sth->execute($batch_id);
1069     my @items ;
1070     while ( my ($itm) = $sth->fetchrow_array ) {
1071         push @items, $itm;
1072     }
1073     return @items;
1074 }
1075
1076 =head2 GetNumberOfImportBatches
1077
1078   my $count = GetNumberOfImportBatches();
1079
1080 =cut
1081
1082 sub GetNumberOfNonZ3950ImportBatches {
1083     my $dbh = C4::Context->dbh;
1084     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1085     $sth->execute();
1086     my ($count) = $sth->fetchrow_array();
1087     $sth->finish();
1088     return $count;
1089 }
1090
1091 =head2 GetImportBiblios
1092
1093   my $results = GetImportBiblios($importid);
1094
1095 =cut
1096
1097 sub GetImportBiblios {
1098     my ($import_record_id) = @_;
1099
1100     my $dbh = C4::Context->dbh;
1101     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1102     return $dbh->selectall_arrayref(
1103         $query,
1104         { Slice => {} },
1105         $import_record_id
1106     );
1107
1108 }
1109
1110 =head2 GetImportRecordsRange
1111
1112   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1113
1114 Returns a reference to an array of hash references corresponding to
1115 import_biblios/import_auths/import_records rows for a given batch
1116 starting at the given offset.
1117
1118 =cut
1119
1120 sub GetImportRecordsRange {
1121     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1122
1123     my $dbh = C4::Context->dbh;
1124
1125     my $order_by = $parameters->{order_by} || 'import_record_id';
1126     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1127
1128     my $order_by_direction =
1129       uc( $parameters->{order_by_direction} ) eq 'DESC' ? 'DESC' : 'ASC';
1130
1131     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1132
1133     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1134                                            record_sequence, status, overlay_status,
1135                                            matched_biblionumber, matched_authid, record_type
1136                                     FROM   import_records
1137                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1138                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1139                                     WHERE  import_batch_id = ?";
1140     my @params;
1141     push(@params, $batch_id);
1142     if ($status) {
1143         $query .= " AND status=?";
1144         push(@params,$status);
1145     }
1146
1147     $query.=" ORDER BY $order_by $order_by_direction";
1148
1149     if($results_per_group){
1150         $query .= " LIMIT ?";
1151         push(@params, $results_per_group);
1152     }
1153     if($offset){
1154         $query .= " OFFSET ?";
1155         push(@params, $offset);
1156     }
1157     my $sth = $dbh->prepare_cached($query);
1158     $sth->execute(@params);
1159     my $results = $sth->fetchall_arrayref({});
1160     $sth->finish();
1161     return $results;
1162
1163 }
1164
1165 =head2 GetBestRecordMatch
1166
1167   my $record_id = GetBestRecordMatch($import_record_id);
1168
1169 =cut
1170
1171 sub GetBestRecordMatch {
1172     my ($import_record_id) = @_;
1173
1174     my $dbh = C4::Context->dbh;
1175     my $sth = $dbh->prepare("SELECT candidate_match_id
1176                              FROM   import_record_matches
1177                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1178                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1179                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1180                              WHERE  import_record_matches.import_record_id = ? AND
1181                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1182                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1183                              ORDER BY score DESC, candidate_match_id DESC");
1184     $sth->execute($import_record_id);
1185     my ($record_id) = $sth->fetchrow_array();
1186     $sth->finish();
1187     return $record_id;
1188 }
1189
1190 =head2 GetImportBatchStatus
1191
1192   my $status = GetImportBatchStatus($batch_id);
1193
1194 =cut
1195
1196 sub GetImportBatchStatus {
1197     my ($batch_id) = @_;
1198
1199     my $dbh = C4::Context->dbh;
1200     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1201     $sth->execute($batch_id);
1202     my ($status) = $sth->fetchrow_array();
1203     $sth->finish();
1204     return $status;
1205
1206 }
1207
1208 =head2 SetImportBatchStatus
1209
1210   SetImportBatchStatus($batch_id, $new_status);
1211
1212 =cut
1213
1214 sub SetImportBatchStatus {
1215     my ($batch_id, $new_status) = @_;
1216
1217     my $dbh = C4::Context->dbh;
1218     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1219     $sth->execute($new_status, $batch_id);
1220     $sth->finish();
1221
1222 }
1223
1224 =head2 GetImportBatchOverlayAction
1225
1226   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1227
1228 =cut
1229
1230 sub GetImportBatchOverlayAction {
1231     my ($batch_id) = @_;
1232
1233     my $dbh = C4::Context->dbh;
1234     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1235     $sth->execute($batch_id);
1236     my ($overlay_action) = $sth->fetchrow_array();
1237     $sth->finish();
1238     return $overlay_action;
1239
1240 }
1241
1242
1243 =head2 SetImportBatchOverlayAction
1244
1245   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1246
1247 =cut
1248
1249 sub SetImportBatchOverlayAction {
1250     my ($batch_id, $new_overlay_action) = @_;
1251
1252     my $dbh = C4::Context->dbh;
1253     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1254     $sth->execute($new_overlay_action, $batch_id);
1255     $sth->finish();
1256
1257 }
1258
1259 =head2 GetImportBatchNoMatchAction
1260
1261   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1262
1263 =cut
1264
1265 sub GetImportBatchNoMatchAction {
1266     my ($batch_id) = @_;
1267
1268     my $dbh = C4::Context->dbh;
1269     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1270     $sth->execute($batch_id);
1271     my ($nomatch_action) = $sth->fetchrow_array();
1272     $sth->finish();
1273     return $nomatch_action;
1274
1275 }
1276
1277
1278 =head2 SetImportBatchNoMatchAction
1279
1280   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1281
1282 =cut
1283
1284 sub SetImportBatchNoMatchAction {
1285     my ($batch_id, $new_nomatch_action) = @_;
1286
1287     my $dbh = C4::Context->dbh;
1288     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1289     $sth->execute($new_nomatch_action, $batch_id);
1290     $sth->finish();
1291
1292 }
1293
1294 =head2 GetImportBatchItemAction
1295
1296   my $item_action = GetImportBatchItemAction($batch_id);
1297
1298 =cut
1299
1300 sub GetImportBatchItemAction {
1301     my ($batch_id) = @_;
1302
1303     my $dbh = C4::Context->dbh;
1304     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1305     $sth->execute($batch_id);
1306     my ($item_action) = $sth->fetchrow_array();
1307     $sth->finish();
1308     return $item_action;
1309
1310 }
1311
1312
1313 =head2 SetImportBatchItemAction
1314
1315   SetImportBatchItemAction($batch_id, $new_item_action);
1316
1317 =cut
1318
1319 sub SetImportBatchItemAction {
1320     my ($batch_id, $new_item_action) = @_;
1321
1322     my $dbh = C4::Context->dbh;
1323     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1324     $sth->execute($new_item_action, $batch_id);
1325     $sth->finish();
1326
1327 }
1328
1329 =head2 GetImportBatchMatcher
1330
1331   my $matcher_id = GetImportBatchMatcher($batch_id);
1332
1333 =cut
1334
1335 sub GetImportBatchMatcher {
1336     my ($batch_id) = @_;
1337
1338     my $dbh = C4::Context->dbh;
1339     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1340     $sth->execute($batch_id);
1341     my ($matcher_id) = $sth->fetchrow_array();
1342     $sth->finish();
1343     return $matcher_id;
1344
1345 }
1346
1347
1348 =head2 SetImportBatchMatcher
1349
1350   SetImportBatchMatcher($batch_id, $new_matcher_id);
1351
1352 =cut
1353
1354 sub SetImportBatchMatcher {
1355     my ($batch_id, $new_matcher_id) = @_;
1356
1357     my $dbh = C4::Context->dbh;
1358     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1359     $sth->execute($new_matcher_id, $batch_id);
1360     $sth->finish();
1361
1362 }
1363
1364 =head2 GetImportRecordOverlayStatus
1365
1366   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1367
1368 =cut
1369
1370 sub GetImportRecordOverlayStatus {
1371     my ($import_record_id) = @_;
1372
1373     my $dbh = C4::Context->dbh;
1374     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1375     $sth->execute($import_record_id);
1376     my ($overlay_status) = $sth->fetchrow_array();
1377     $sth->finish();
1378     return $overlay_status;
1379
1380 }
1381
1382
1383 =head2 SetImportRecordOverlayStatus
1384
1385   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1386
1387 =cut
1388
1389 sub SetImportRecordOverlayStatus {
1390     my ($import_record_id, $new_overlay_status) = @_;
1391
1392     my $dbh = C4::Context->dbh;
1393     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1394     $sth->execute($new_overlay_status, $import_record_id);
1395     $sth->finish();
1396
1397 }
1398
1399 =head2 GetImportRecordStatus
1400
1401   my $status = GetImportRecordStatus($import_record_id);
1402
1403 =cut
1404
1405 sub GetImportRecordStatus {
1406     my ($import_record_id) = @_;
1407
1408     my $dbh = C4::Context->dbh;
1409     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1410     $sth->execute($import_record_id);
1411     my ($status) = $sth->fetchrow_array();
1412     $sth->finish();
1413     return $status;
1414
1415 }
1416
1417
1418 =head2 SetImportRecordStatus
1419
1420   SetImportRecordStatus($import_record_id, $new_status);
1421
1422 =cut
1423
1424 sub SetImportRecordStatus {
1425     my ($import_record_id, $new_status) = @_;
1426
1427     my $dbh = C4::Context->dbh;
1428     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1429     $sth->execute($new_status, $import_record_id);
1430     $sth->finish();
1431
1432 }
1433
1434 =head2 GetImportRecordMatches
1435
1436   my $results = GetImportRecordMatches($import_record_id, $best_only);
1437
1438 =cut
1439
1440 sub GetImportRecordMatches {
1441     my $import_record_id = shift;
1442     my $best_only = @_ ? shift : 0;
1443
1444     my $dbh = C4::Context->dbh;
1445     # FIXME currently biblio only
1446     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1447                                     candidate_match_id, score, record_type
1448                                     FROM import_records
1449                                     JOIN import_record_matches USING (import_record_id)
1450                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1451                                     WHERE import_record_id = ?
1452                                     ORDER BY score DESC, biblionumber DESC");
1453     $sth->bind_param(1, $import_record_id);
1454     my $results = [];
1455     $sth->execute();
1456     while (my $row = $sth->fetchrow_hashref) {
1457         if ($row->{'record_type'} eq 'auth') {
1458             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1459         }
1460         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1461         push @$results, $row;
1462         last if $best_only;
1463     }
1464     $sth->finish();
1465
1466     return $results;
1467     
1468 }
1469
1470 =head2 SetImportRecordMatches
1471
1472   SetImportRecordMatches($import_record_id, @matches);
1473
1474 =cut
1475
1476 sub SetImportRecordMatches {
1477     my $import_record_id = shift;
1478     my @matches = @_;
1479
1480     my $dbh = C4::Context->dbh;
1481     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1482     $delsth->execute($import_record_id);
1483     $delsth->finish();
1484
1485     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1486                                     VALUES (?, ?, ?)");
1487     foreach my $match (@matches) {
1488         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1489     }
1490 }
1491
1492 =head2 RecordsFromISO2709File
1493
1494     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1495
1496 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1497
1498 @PARAM1, String, absolute path to the ISO2709 file.
1499 @PARAM2, String, see stage_file.pl
1500 @PARAM3, String, should be utf8
1501
1502 Returns two array refs.
1503
1504 =cut
1505
1506 sub RecordsFromISO2709File {
1507     my ($input_file, $record_type, $encoding) = @_;
1508     my @errors;
1509
1510     my $marc_type = C4::Context->preference('marcflavour');
1511     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1512
1513     open IN, "<$input_file" or die "$0: cannot open input file $input_file: $!\n";
1514     my @marc_records;
1515     $/ = "\035";
1516     while (<IN>) {
1517         s/^\s+//;
1518         s/\s+$//;
1519         next unless $_; # skip if record has only whitespace, as might occur
1520                         # if file includes newlines between each MARC record
1521         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1522         push @marc_records, $marc_record;
1523         if ($charset_guessed ne $encoding) {
1524             push @errors,
1525                 "Unexpected charset $charset_guessed, expecting $encoding";
1526         }
1527     }
1528     close IN;
1529     return ( \@errors, \@marc_records );
1530 }
1531
1532 =head2 RecordsFromMARCXMLFile
1533
1534     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1535
1536 Creates MARC::Record-objects out of the given MARCXML-file.
1537
1538 @PARAM1, String, absolute path to the ISO2709 file.
1539 @PARAM2, String, should be utf8
1540
1541 Returns two array refs.
1542
1543 =cut
1544
1545 sub RecordsFromMARCXMLFile {
1546     my ( $filename, $encoding ) = @_;
1547     my $batch = MARC::File::XML->in( $filename );
1548     my ( @marcRecords, @errors, $record );
1549     do {
1550         eval { $record = $batch->next( $encoding ); };
1551         if ($@) {
1552             push @errors, $@;
1553         }
1554         push @marcRecords, $record if $record;
1555     } while( $record );
1556     return (\@errors, \@marcRecords);
1557 }
1558
1559 # internal functions
1560
1561 sub _create_import_record {
1562     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1563
1564     my $dbh = C4::Context->dbh;
1565     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, 
1566                                                          record_type, encoding, z3950random)
1567                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1568     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type),
1569                   $record_type, $encoding, $z3950random);
1570     my $import_record_id = $dbh->{'mysql_insertid'};
1571     $sth->finish();
1572     return $import_record_id;
1573 }
1574
1575 sub _update_import_record_marc {
1576     my ($import_record_id, $marc_record, $marc_type) = @_;
1577
1578     my $dbh = C4::Context->dbh;
1579     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1580                              WHERE  import_record_id = ?");
1581     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1582     $sth->finish();
1583 }
1584
1585 sub _add_auth_fields {
1586     my ($import_record_id, $marc_record) = @_;
1587
1588     my $controlnumber;
1589     if ($marc_record->field('001')) {
1590         $controlnumber = $marc_record->field('001')->data();
1591     }
1592     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1593     my $dbh = C4::Context->dbh;
1594     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1595     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1596     $sth->finish();
1597 }
1598
1599 sub _add_biblio_fields {
1600     my ($import_record_id, $marc_record) = @_;
1601
1602     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1603     my $dbh = C4::Context->dbh;
1604     # FIXME no controlnumber, originalsource
1605     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1606     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1607     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1608     $sth->finish();
1609                 
1610 }
1611
1612 sub _update_biblio_fields {
1613     my ($import_record_id, $marc_record) = @_;
1614
1615     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1616     my $dbh = C4::Context->dbh;
1617     # FIXME no controlnumber, originalsource
1618     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1619     $isbn =~ s/\(.*$//;
1620     $isbn =~ tr/ -_//;
1621     $isbn = uc $isbn;
1622     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1623                              WHERE  import_record_id = ?");
1624     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1625     $sth->finish();
1626 }
1627
1628 sub _parse_biblio_fields {
1629     my ($marc_record) = @_;
1630
1631     my $dbh = C4::Context->dbh;
1632     my $bibliofields = TransformMarcToKoha($marc_record, '');
1633     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1634
1635 }
1636
1637 sub _update_batch_record_counts {
1638     my ($batch_id) = @_;
1639
1640     my $dbh = C4::Context->dbh;
1641     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1642                                         num_records = (
1643                                             SELECT COUNT(*)
1644                                             FROM import_records
1645                                             WHERE import_batch_id = import_batches.import_batch_id),
1646                                         num_items = (
1647                                             SELECT COUNT(*)
1648                                             FROM import_records
1649                                             JOIN import_items USING (import_record_id)
1650                                             WHERE import_batch_id = import_batches.import_batch_id
1651                                             AND record_type = 'biblio')
1652                                     WHERE import_batch_id = ?");
1653     $sth->bind_param(1, $batch_id);
1654     $sth->execute();
1655     $sth->finish();
1656 }
1657
1658 sub _get_commit_action {
1659     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1660     
1661     if ($record_type eq 'biblio') {
1662         my ($bib_result, $bib_match, $item_result);
1663
1664         if ($overlay_status ne 'no_match') {
1665             $bib_match = GetBestRecordMatch($import_record_id);
1666             if ($overlay_action eq 'replace') {
1667                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1668             } elsif ($overlay_action eq 'create_new') {
1669                 $bib_result  = 'create_new';
1670             } elsif ($overlay_action eq 'ignore') {
1671                 $bib_result  = 'ignore';
1672             }
1673          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1674                 $item_result = 'create_new';
1675        }
1676       elsif($item_action eq 'replace'){
1677           $item_result = 'replace';
1678           }
1679       else {
1680              $item_result = 'ignore';
1681            }
1682         } else {
1683             $bib_result = $nomatch_action;
1684             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1685         }
1686         return ($bib_result, $item_result, $bib_match);
1687     } else { # must be auths
1688         my ($auth_result, $auth_match);
1689
1690         if ($overlay_status ne 'no_match') {
1691             $auth_match = GetBestRecordMatch($import_record_id);
1692             if ($overlay_action eq 'replace') {
1693                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1694             } elsif ($overlay_action eq 'create_new') {
1695                 $auth_result  = 'create_new';
1696             } elsif ($overlay_action eq 'ignore') {
1697                 $auth_result  = 'ignore';
1698             }
1699         } else {
1700             $auth_result = $nomatch_action;
1701         }
1702
1703         return ($auth_result, undef, $auth_match);
1704
1705     }
1706 }
1707
1708 sub _get_revert_action {
1709     my ($overlay_action, $overlay_status, $status) = @_;
1710
1711     my $bib_result;
1712
1713     if ($status eq 'ignored') {
1714         $bib_result = 'ignore';
1715     } else {
1716         if ($overlay_action eq 'create_new') {
1717             $bib_result = 'delete';
1718         } else {
1719             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1720         }
1721     }
1722     return $bib_result;
1723 }
1724
1725 1;
1726 __END__
1727
1728 =head1 AUTHOR
1729
1730 Koha Development Team <http://koha-community.org/>
1731
1732 Galen Charlton <galen.charlton@liblime.com>
1733
1734 =cut