99bc4287019e41b15bef59841aeacce3e1d2b75b
[koha-equinox.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha;
25 use C4::Biblio;
26 use C4::Items;
27 use C4::Charset;
28 use C4::AuthoritiesMarc;
29 use C4::MarcModificationTemplates;
30 use Koha::Items;
31 use Koha::Plugins::Handler;
32 use Koha::Logger;
33
34 use vars qw(@ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
35
36 BEGIN {
37         require Exporter;
38         @ISA    = qw(Exporter);
39         @EXPORT = qw(
40     GetZ3950BatchId
41     GetWebserviceBatchId
42     GetImportRecordMarc
43     GetImportRecordMarcXML
44     AddImportBatch
45     GetImportBatch
46     AddAuthToBatch
47     AddBiblioToBatch
48     AddItemsToImportBiblio
49     ModAuthorityInBatch
50     ModBiblioInBatch
51
52     BatchStageMarcRecords
53     BatchFindDuplicates
54     BatchCommitRecords
55     BatchRevertRecords
56     CleanBatch
57     DeleteBatch
58
59     GetAllImportBatches
60     GetStagedWebserviceBatches
61     GetImportBatchRangeDesc
62     GetNumberOfNonZ3950ImportBatches
63     GetImportBiblios
64     GetImportRecordsRange
65         GetItemNumbersFromImportBatch
66     
67     GetImportBatchStatus
68     SetImportBatchStatus
69     GetImportBatchOverlayAction
70     SetImportBatchOverlayAction
71     GetImportBatchNoMatchAction
72     SetImportBatchNoMatchAction
73     GetImportBatchItemAction
74     SetImportBatchItemAction
75     GetImportBatchMatcher
76     SetImportBatchMatcher
77     GetImportRecordOverlayStatus
78     SetImportRecordOverlayStatus
79     GetImportRecordStatus
80     SetImportRecordStatus
81     GetImportRecordMatches
82     SetImportRecordMatches
83         );
84 }
85
86 our $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
87
88 =head1 NAME
89
90 C4::ImportBatch - manage batches of imported MARC records
91
92 =head1 SYNOPSIS
93
94 use C4::ImportBatch;
95
96 =head1 FUNCTIONS
97
98 =head2 GetZ3950BatchId
99
100   my $batchid = GetZ3950BatchId($z3950server);
101
102 Retrieves the ID of the import batch for the Z39.50
103 reservoir for the given target.  If necessary,
104 creates the import batch.
105
106 =cut
107
108 sub GetZ3950BatchId {
109     my ($z3950server) = @_;
110
111     my $dbh = C4::Context->dbh;
112     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
113                              WHERE  batch_type = 'z3950'
114                              AND    file_name = ?");
115     $sth->execute($z3950server);
116     my $rowref = $sth->fetchrow_arrayref();
117     $sth->finish();
118     if (defined $rowref) {
119         return $rowref->[0];
120     } else {
121         my $batch_id = AddImportBatch( {
122                 overlay_action => 'create_new',
123                 import_status => 'staged',
124                 batch_type => 'z3950',
125                 file_name => $z3950server,
126             } );
127         return $batch_id;
128     }
129     
130 }
131
132 =head2 GetWebserviceBatchId
133
134   my $batchid = GetWebserviceBatchId();
135
136 Retrieves the ID of the import batch for webservice.
137 If necessary, creates the import batch.
138
139 =cut
140
141 my $WEBSERVICE_BASE_QRY = <<EOQ;
142 SELECT import_batch_id FROM import_batches
143 WHERE  batch_type = 'webservice'
144 AND    import_status = 'staged'
145 EOQ
146 sub GetWebserviceBatchId {
147     my ($params) = @_;
148
149     my $dbh = C4::Context->dbh;
150     my $sql = $WEBSERVICE_BASE_QRY;
151     my @args;
152     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
153         if (my $val = $params->{$field}) {
154             $sql .= " AND $field = ?";
155             push @args, $val;
156         }
157     }
158     my $id = $dbh->selectrow_array($sql, undef, @args);
159     return $id if $id;
160
161     $params->{batch_type} = 'webservice';
162     $params->{import_status} = 'staged';
163     return AddImportBatch($params);
164 }
165
166 =head2 GetImportRecordMarc
167
168   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
169
170 =cut
171
172 sub GetImportRecordMarc {
173     my ($import_record_id) = @_;
174
175     my $dbh = C4::Context->dbh;
176     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
177         SELECT marc, encoding
178         FROM import_records
179         WHERE import_record_id = ?
180     |, undef, $import_record_id );
181
182     return $marc, $encoding;
183 }
184
185 sub GetRecordFromImportBiblio {
186     my ( $import_record_id, $embed_items ) = @_;
187
188     my ($marc) = GetImportRecordMarc($import_record_id);
189     my $record = MARC::Record->new_from_usmarc($marc);
190
191     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
192
193     return $record;
194 }
195
196 sub EmbedItemsInImportBiblio {
197     my ( $record, $import_record_id ) = @_;
198     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber", '');
199     my $dbh = C4::Context->dbh;
200     my $import_items = $dbh->selectall_arrayref(q|
201         SELECT import_items.marcxml
202         FROM import_items
203         WHERE import_record_id = ?
204     |, { Slice => {} }, $import_record_id );
205     my @item_fields;
206     for my $import_item ( @$import_items ) {
207         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml});
208         push @item_fields, $item_marc->field($itemtag);
209     }
210     $record->append_fields(@item_fields);
211     return $record;
212 }
213
214 =head2 GetImportRecordMarcXML
215
216   my $marcxml = GetImportRecordMarcXML($import_record_id);
217
218 =cut
219
220 sub GetImportRecordMarcXML {
221     my ($import_record_id) = @_;
222
223     my $dbh = C4::Context->dbh;
224     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
225     $sth->execute($import_record_id);
226     my ($marcxml) = $sth->fetchrow();
227     $sth->finish();
228     return $marcxml;
229
230 }
231
232 =head2 AddImportBatch
233
234   my $batch_id = AddImportBatch($params_hash);
235
236 =cut
237
238 sub AddImportBatch {
239     my ($params) = @_;
240
241     my (@fields, @vals);
242     foreach (qw( matcher_id template_id branchcode
243                  overlay_action nomatch_action item_action
244                  import_status batch_type file_name comments record_type )) {
245         if (exists $params->{$_}) {
246             push @fields, $_;
247             push @vals, $params->{$_};
248         }
249     }
250     my $dbh = C4::Context->dbh;
251     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
252                                   VALUES (".join( ',', map '?', @fields).")",
253              undef,
254              @vals);
255     return $dbh->{'mysql_insertid'};
256 }
257
258 =head2 GetImportBatch 
259
260   my $row = GetImportBatch($batch_id);
261
262 Retrieve a hashref of an import_batches row.
263
264 =cut
265
266 sub GetImportBatch {
267     my ($batch_id) = @_;
268
269     my $dbh = C4::Context->dbh;
270     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches WHERE import_batch_id = ?");
271     $sth->bind_param(1, $batch_id);
272     $sth->execute();
273     my $result = $sth->fetchrow_hashref;
274     $sth->finish();
275     return $result;
276
277 }
278
279 =head2 AddBiblioToBatch 
280
281   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
282                 $marc_record, $encoding, $z3950random, $update_counts);
283
284 =cut
285
286 sub AddBiblioToBatch {
287     my $batch_id = shift;
288     my $record_sequence = shift;
289     my $marc_record = shift;
290     my $encoding = shift;
291     my $z3950random = shift;
292     my $update_counts = @_ ? shift : 1;
293
294     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, $z3950random, C4::Context->preference('marcflavour'));
295     _add_biblio_fields($import_record_id, $marc_record);
296     _update_batch_record_counts($batch_id) if $update_counts;
297     return $import_record_id;
298 }
299
300 =head2 ModBiblioInBatch
301
302   ModBiblioInBatch($import_record_id, $marc_record);
303
304 =cut
305
306 sub ModBiblioInBatch {
307     my ($import_record_id, $marc_record) = @_;
308
309     _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
310     _update_biblio_fields($import_record_id, $marc_record);
311
312 }
313
314 =head2 AddAuthToBatch
315
316   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
317                 $marc_record, $encoding, $z3950random, $update_counts, [$marc_type]);
318
319 =cut
320
321 sub AddAuthToBatch {
322     my $batch_id = shift;
323     my $record_sequence = shift;
324     my $marc_record = shift;
325     my $encoding = shift;
326     my $z3950random = shift;
327     my $update_counts = @_ ? shift : 1;
328     my $marc_type = shift || C4::Context->preference('marcflavour');
329
330     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
331
332     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $z3950random, $marc_type);
333     _add_auth_fields($import_record_id, $marc_record);
334     _update_batch_record_counts($batch_id) if $update_counts;
335     return $import_record_id;
336 }
337
338 =head2 ModAuthInBatch
339
340   ModAuthInBatch($import_record_id, $marc_record);
341
342 =cut
343
344 sub ModAuthInBatch {
345     my ($import_record_id, $marc_record) = @_;
346
347     my $marcflavour = C4::Context->preference('marcflavour');
348     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
349
350 }
351
352 =head2 BatchStageMarcRecords
353
354 ( $batch_id, $num_records, $num_items, @invalid_records ) =
355   BatchStageMarcRecords(
356     $record_type,                $encoding,
357     $marc_records,               $file_name,
358     $marc_modification_template, $comments,
359     $branch_code,                $parse_items,
360     $leave_as_staging,           $progress_interval,
361     $progress_callback
362   );
363
364 =cut
365
366 sub BatchStageMarcRecords {
367     my $record_type = shift;
368     my $encoding = shift;
369     my $marc_records = shift;
370     my $file_name = shift;
371     my $marc_modification_template = shift;
372     my $comments = shift;
373     my $branch_code = shift;
374     my $parse_items = shift;
375     my $leave_as_staging = shift;
376
377     # optional callback to monitor status 
378     # of job
379     my $progress_interval = 0;
380     my $progress_callback = undef;
381     if ($#_ == 1) {
382         $progress_interval = shift;
383         $progress_callback = shift;
384         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
385         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
386     } 
387     
388     my $batch_id = AddImportBatch( {
389             overlay_action => 'create_new',
390             import_status => 'staging',
391             batch_type => 'batch',
392             file_name => $file_name,
393             comments => $comments,
394             record_type => $record_type,
395         } );
396     if ($parse_items) {
397         SetImportBatchItemAction($batch_id, 'always_add');
398     } else {
399         SetImportBatchItemAction($batch_id, 'ignore');
400     }
401
402
403     my $marc_type = C4::Context->preference('marcflavour');
404     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
405     my @invalid_records = ();
406     my $num_valid = 0;
407     my $num_items = 0;
408     # FIXME - for now, we're dealing only with bibs
409     my $rec_num = 0;
410     foreach my $marc_record (@$marc_records) {
411         $rec_num++;
412         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
413             &$progress_callback($rec_num);
414         }
415
416         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
417
418         my $import_record_id;
419         if (scalar($marc_record->fields()) == 0) {
420             push @invalid_records, $marc_record;
421         } else {
422
423             # Normalize the record so it doesn't have separated diacritics
424             SetUTF8Flag($marc_record);
425
426             $num_valid++;
427             if ($record_type eq 'biblio') {
428                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
429                 if ($parse_items) {
430                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
431                     $num_items += scalar(@import_items_ids);
432                 }
433             } elsif ($record_type eq 'auth') {
434                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
435             }
436         }
437     }
438     unless ($leave_as_staging) {
439         SetImportBatchStatus($batch_id, 'staged');
440     }
441     # FIXME branch_code, number of bibs, number of items
442     _update_batch_record_counts($batch_id);
443     return ($batch_id, $num_valid, $num_items, @invalid_records);
444 }
445
446 =head2 AddItemsToImportBiblio
447
448   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
449                 $import_record_id, $marc_record, $update_counts);
450
451 =cut
452
453 sub AddItemsToImportBiblio {
454     my $batch_id = shift;
455     my $import_record_id = shift;
456     my $marc_record = shift;
457     my $update_counts = @_ ? shift : 0;
458
459     my @import_items_ids = ();
460    
461     my $dbh = C4::Context->dbh; 
462     my ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
463     foreach my $item_field ($marc_record->field($item_tag)) {
464         my $item_marc = MARC::Record->new();
465         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
466         $item_marc->append_fields($item_field);
467         $marc_record->delete_field($item_field);
468         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
469                                         VALUES (?, ?, ?)");
470         $sth->bind_param(1, $import_record_id);
471         $sth->bind_param(2, 'staged');
472         $sth->bind_param(3, $item_marc->as_xml());
473         $sth->execute();
474         push @import_items_ids, $dbh->{'mysql_insertid'};
475         $sth->finish();
476     }
477
478     if ($#import_items_ids > -1) {
479         _update_batch_record_counts($batch_id) if $update_counts;
480         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
481     }
482     return @import_items_ids;
483 }
484
485 =head2 BatchFindDuplicates
486
487   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
488              $max_matches, $progress_interval, $progress_callback);
489
490 Goes through the records loaded in the batch and attempts to 
491 find duplicates for each one.  Sets the matching status 
492 of each record to "no_match" or "auto_match" as appropriate.
493
494 The $max_matches parameter is optional; if it is not supplied,
495 it defaults to 10.
496
497 The $progress_interval and $progress_callback parameters are 
498 optional; if both are supplied, the sub referred to by
499 $progress_callback will be invoked every $progress_interval
500 records using the number of records processed as the 
501 singular argument.
502
503 =cut
504
505 sub BatchFindDuplicates {
506     my $batch_id = shift;
507     my $matcher = shift;
508     my $max_matches = @_ ? shift : 10;
509
510     # optional callback to monitor status 
511     # of job
512     my $progress_interval = 0;
513     my $progress_callback = undef;
514     if ($#_ == 1) {
515         $progress_interval = shift;
516         $progress_callback = shift;
517         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
518         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
519     }
520
521     my $dbh = C4::Context->dbh;
522
523     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
524                              FROM import_records
525                              WHERE import_batch_id = ?");
526     $sth->execute($batch_id);
527     my $num_with_matches = 0;
528     my $rec_num = 0;
529     while (my $rowref = $sth->fetchrow_hashref) {
530         $rec_num++;
531         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
532             &$progress_callback($rec_num);
533         }
534         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
535         my @matches = ();
536         if (defined $matcher) {
537             @matches = $matcher->get_matches($marc_record, $max_matches);
538         }
539         if (scalar(@matches) > 0) {
540             $num_with_matches++;
541             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
542             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
543         } else {
544             SetImportRecordMatches($rowref->{'import_record_id'}, ());
545             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
546         }
547     }
548     $sth->finish();
549     return $num_with_matches;
550 }
551
552 =head2 BatchCommitRecords
553
554   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
555         BatchCommitRecords($batch_id, $framework,
556         $progress_interval, $progress_callback);
557
558 =cut
559
560 sub BatchCommitRecords {
561     my $batch_id = shift;
562     my $framework = shift;
563
564     # optional callback to monitor status 
565     # of job
566     my $progress_interval = 0;
567     my $progress_callback = undef;
568     if ($#_ == 1) {
569         $progress_interval = shift;
570         $progress_callback = shift;
571         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
572         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
573     }
574
575     my $record_type;
576     my $num_added = 0;
577     my $num_updated = 0;
578     my $num_items_added = 0;
579     my $num_items_replaced = 0;
580     my $num_items_errored = 0;
581     my $num_ignored = 0;
582     # commit (i.e., save, all records in the batch)
583     SetImportBatchStatus('importing');
584     my $overlay_action = GetImportBatchOverlayAction($batch_id);
585     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
586     my $item_action = GetImportBatchItemAction($batch_id);
587     my $item_tag;
588     my $item_subfield;
589     my $dbh = C4::Context->dbh;
590     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
591                              FROM import_records
592                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
593                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
594                              WHERE import_batch_id = ?");
595     $sth->execute($batch_id);
596     my $marcflavour = C4::Context->preference('marcflavour');
597     my $rec_num = 0;
598     while (my $rowref = $sth->fetchrow_hashref) {
599         $record_type = $rowref->{'record_type'};
600         $rec_num++;
601         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
602             &$progress_callback($rec_num);
603         }
604         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
605             $num_ignored++;
606             next;
607         }
608
609         my $marc_type;
610         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
611             $marc_type = 'UNIMARCAUTH';
612         } elsif ($marcflavour eq 'UNIMARC') {
613             $marc_type = 'UNIMARC';
614         } else {
615             $marc_type = 'USMARC';
616         }
617         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
618
619         if ($record_type eq 'biblio') {
620             # remove any item tags - rely on BatchCommitItems
621             ($item_tag,$item_subfield) = &GetMarcFromKohaField("items.itemnumber",'');
622             foreach my $item_field ($marc_record->field($item_tag)) {
623                 $marc_record->delete_field($item_field);
624             }
625         }
626
627         my ($record_result, $item_result, $record_match) =
628             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
629                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
630
631         my $recordid;
632         my $query;
633         if ($record_result eq 'create_new') {
634             $num_added++;
635             if ($record_type eq 'biblio') {
636                 my $biblioitemnumber;
637                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
638                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
639                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
640                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
641                     $num_items_added += $bib_items_added;
642                     $num_items_replaced += $bib_items_replaced;
643                     $num_items_errored += $bib_items_errored;
644                 }
645             } else {
646                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
647                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
648             }
649             my $sth = $dbh->prepare_cached($query);
650             $sth->execute($recordid, $rowref->{'import_record_id'});
651             $sth->finish();
652             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
653         } elsif ($record_result eq 'replace') {
654             $num_updated++;
655             $recordid = $record_match;
656             my $oldxml;
657             if ($record_type eq 'biblio') {
658                 my $oldbiblio = Koha::Biblios->find( $recordid );
659                 $oldxml = GetXmlBiblio($recordid);
660
661                 # remove item fields so that they don't get
662                 # added again if record is reverted
663                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
664                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
665                 foreach my $item_field ($old_marc->field($item_tag)) {
666                     $old_marc->delete_field($item_field);
667                 }
668                 $oldxml = $old_marc->as_xml($marc_type);
669
670                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode);
671                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?";
672
673                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
674                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
675                     $num_items_added += $bib_items_added;
676                     $num_items_replaced += $bib_items_replaced;
677                     $num_items_errored += $bib_items_errored;
678                 }
679             } else {
680                 $oldxml = GetAuthorityXML($recordid);
681
682                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
683                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
684             }
685             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
686             $sth->execute($oldxml, $rowref->{'import_record_id'});
687             $sth->finish();
688             my $sth2 = $dbh->prepare_cached($query);
689             $sth2->execute($recordid, $rowref->{'import_record_id'});
690             $sth2->finish();
691             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
692             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
693         } elsif ($record_result eq 'ignore') {
694             $recordid = $record_match;
695             $num_ignored++;
696             $recordid = $record_match;
697             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
698                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
699                 $num_items_added += $bib_items_added;
700          $num_items_replaced += $bib_items_replaced;
701                 $num_items_errored += $bib_items_errored;
702                 # still need to record the matched biblionumber so that the
703                 # items can be reverted
704                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?");
705                 $sth2->execute($recordid, $rowref->{'import_record_id'});
706                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
707             }
708             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
709         }
710     }
711     $sth->finish();
712     SetImportBatchStatus($batch_id, 'imported');
713     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
714 }
715
716 =head2 BatchCommitItems
717
718   ($num_items_added, $num_items_errored) = 
719          BatchCommitItems($import_record_id, $biblionumber);
720
721 =cut
722
723 sub BatchCommitItems {
724     my ( $import_record_id, $biblionumber, $action ) = @_;
725
726     my $dbh = C4::Context->dbh;
727
728     my $num_items_added = 0;
729     my $num_items_errored = 0;
730     my $num_items_replaced = 0;
731
732     my $sth = $dbh->prepare( "
733         SELECT import_items_id, import_items.marcxml, encoding
734         FROM import_items
735         JOIN import_records USING (import_record_id)
736         WHERE import_record_id = ?
737         ORDER BY import_items_id
738     " );
739     $sth->bind_param( 1, $import_record_id );
740     $sth->execute();
741
742     while ( my $row = $sth->fetchrow_hashref() ) {
743         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
744
745         # Delete date_due subfield as to not accidentally delete item checkout due dates
746         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan', GetFrameworkCode($biblionumber) );
747         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
748
749         my $item = TransformMarcToKoha( $item_marc );
750
751         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
752         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
753
754         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ? WHERE import_items_id = ?");
755         if ( $action eq "replace" && $duplicate_itemnumber ) {
756             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
757             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
758             $updsth->bind_param( 1, 'imported' );
759             $updsth->bind_param( 2, $item->{itemnumber} );
760             $updsth->bind_param( 3, $row->{'import_items_id'} );
761             $updsth->execute();
762             $updsth->finish();
763             $num_items_replaced++;
764         } elsif ( $action eq "replace" && $duplicate_barcode ) {
765             my $itemnumber = $duplicate_barcode->itemnumber;
766             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
767             $updsth->bind_param( 1, 'imported' );
768             $updsth->bind_param( 2, $item->{itemnumber} );
769             $updsth->bind_param( 3, $row->{'import_items_id'} );
770             $updsth->execute();
771             $updsth->finish();
772             $num_items_replaced++;
773         } elsif ($duplicate_barcode) {
774             $updsth->bind_param( 1, 'error' );
775             $updsth->bind_param( 2, 'duplicate item barcode' );
776             $updsth->bind_param( 3, $row->{'import_items_id'} );
777             $updsth->execute();
778             $num_items_errored++;
779         } else {
780             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
781             if( $itemnumber ) {
782                 $updsth->bind_param( 1, 'imported' );
783                 $updsth->bind_param( 2, $itemnumber );
784                 $updsth->bind_param( 3, $row->{'import_items_id'} );
785                 $updsth->execute();
786                 $updsth->finish();
787                 $num_items_added++;
788             }
789         }
790     }
791
792     return ( $num_items_added, $num_items_replaced, $num_items_errored );
793 }
794
795 =head2 BatchRevertRecords
796
797   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
798       $num_ignored) = BatchRevertRecords($batch_id);
799
800 =cut
801
802 sub BatchRevertRecords {
803     my $batch_id = shift;
804
805     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
806
807     my $record_type;
808     my $num_deleted = 0;
809     my $num_errors = 0;
810     my $num_reverted = 0;
811     my $num_ignored = 0;
812     my $num_items_deleted = 0;
813     # commit (i.e., save, all records in the batch)
814     SetImportBatchStatus('reverting');
815     my $overlay_action = GetImportBatchOverlayAction($batch_id);
816     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
817     my $dbh = C4::Context->dbh;
818     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
819                              FROM import_records
820                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
821                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
822                              WHERE import_batch_id = ?");
823     $sth->execute($batch_id);
824     my $marc_type;
825     my $marcflavour = C4::Context->preference('marcflavour');
826     while (my $rowref = $sth->fetchrow_hashref) {
827         $record_type = $rowref->{'record_type'};
828         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
829             $num_ignored++;
830             next;
831         }
832         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
833             $marc_type = 'UNIMARCAUTH';
834         } elsif ($marcflavour eq 'UNIMARC') {
835             $marc_type = 'UNIMARC';
836         } else {
837             $marc_type = 'USMARC';
838         }
839
840         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
841
842         if ($record_result eq 'delete') {
843             my $error = undef;
844             if  ($record_type eq 'biblio') {
845                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
846                 $error = DelBiblio($rowref->{'matched_biblionumber'});
847             } else {
848                 DelAuthority({ authid => $rowref->{'matched_authid'} });
849             }
850             if (defined $error) {
851                 $num_errors++;
852             } else {
853                 $num_deleted++;
854                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
855             }
856         } elsif ($record_result eq 'restore') {
857             $num_reverted++;
858             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
859             if ($record_type eq 'biblio') {
860                 my $biblionumber = $rowref->{'matched_biblionumber'};
861                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
862
863                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
864                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
865
866                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
867                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
868             } else {
869                 my $authid = $rowref->{'matched_authid'};
870                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
871             }
872             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
873         } elsif ($record_result eq 'ignore') {
874             if ($record_type eq 'biblio') {
875                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
876             }
877             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
878         }
879         my $query;
880         if ($record_type eq 'biblio') {
881             # remove matched_biblionumber only if there is no 'imported' item left
882             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?";
883             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
884         } else {
885             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
886         }
887         my $sth2 = $dbh->prepare_cached($query);
888         $sth2->execute($rowref->{'import_record_id'});
889     }
890
891     $sth->finish();
892     SetImportBatchStatus($batch_id, 'reverted');
893     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
894 }
895
896 =head2 BatchRevertItems
897
898   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
899
900 =cut
901
902 sub BatchRevertItems {
903     my ($import_record_id, $biblionumber) = @_;
904
905     my $dbh = C4::Context->dbh;
906     my $num_items_deleted = 0;
907
908     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
909                                    FROM import_items
910                                    JOIN items USING (itemnumber)
911                                    WHERE import_record_id = ?");
912     $sth->bind_param(1, $import_record_id);
913     $sth->execute();
914     while (my $row = $sth->fetchrow_hashref()) {
915         my $error = DelItemCheck( $biblionumber, $row->{'itemnumber'});
916         if ($error == 1){
917             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
918             $updsth->bind_param(1, 'reverted');
919             $updsth->bind_param(2, $row->{'import_items_id'});
920             $updsth->execute();
921             $updsth->finish();
922             $num_items_deleted++;
923         }
924         else {
925             next;
926         }
927     }
928     $sth->finish();
929     return $num_items_deleted;
930 }
931
932 =head2 CleanBatch
933
934   CleanBatch($batch_id)
935
936 Deletes all staged records from the import batch
937 and sets the status of the batch to 'cleaned'.  Note
938 that deleting a stage record does *not* affect
939 any record that has been committed to the database.
940
941 =cut
942
943 sub CleanBatch {
944     my $batch_id = shift;
945     return unless defined $batch_id;
946
947     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
948     SetImportBatchStatus($batch_id, 'cleaned');
949 }
950
951 =head2 DeleteBatch
952
953   DeleteBatch($batch_id)
954
955 Deletes the record from the database. This can only be done
956 once the batch has been cleaned.
957
958 =cut
959
960 sub DeleteBatch {
961     my $batch_id = shift;
962     return unless defined $batch_id;
963
964     my $dbh = C4::Context->dbh;
965     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
966     $sth->execute( $batch_id );
967 }
968
969 =head2 GetAllImportBatches
970
971   my $results = GetAllImportBatches();
972
973 Returns a references to an array of hash references corresponding
974 to all import_batches rows (of batch_type 'batch'), sorted in 
975 ascending order by import_batch_id.
976
977 =cut
978
979 sub  GetAllImportBatches {
980     my $dbh = C4::Context->dbh;
981     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
982                                     WHERE batch_type IN ('batch', 'webservice')
983                                     ORDER BY import_batch_id ASC");
984
985     my $results = [];
986     $sth->execute();
987     while (my $row = $sth->fetchrow_hashref) {
988         push @$results, $row;
989     }
990     $sth->finish();
991     return $results;
992 }
993
994 =head2 GetStagedWebserviceBatches
995
996   my $batch_ids = GetStagedWebserviceBatches();
997
998 Returns a references to an array of batch id's
999 of batch_type 'webservice' that are not imported
1000
1001 =cut
1002
1003 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1004 SELECT import_batch_id FROM import_batches
1005 WHERE batch_type = 'webservice'
1006 AND import_status = 'staged'
1007 EOQ
1008 sub  GetStagedWebserviceBatches {
1009     my $dbh = C4::Context->dbh;
1010     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1011 }
1012
1013 =head2 GetImportBatchRangeDesc
1014
1015   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1016
1017 Returns a reference to an array of hash references corresponding to
1018 import_batches rows (sorted in descending order by import_batch_id)
1019 start at the given offset.
1020
1021 =cut
1022
1023 sub GetImportBatchRangeDesc {
1024     my ($offset, $results_per_group) = @_;
1025
1026     my $dbh = C4::Context->dbh;
1027     my $query = "SELECT * FROM import_batches
1028                                     WHERE batch_type IN ('batch', 'webservice')
1029                                     ORDER BY import_batch_id DESC";
1030     my @params;
1031     if ($results_per_group){
1032         $query .= " LIMIT ?";
1033         push(@params, $results_per_group);
1034     }
1035     if ($offset){
1036         $query .= " OFFSET ?";
1037         push(@params, $offset);
1038     }
1039     my $sth = $dbh->prepare_cached($query);
1040     $sth->execute(@params);
1041     my $results = $sth->fetchall_arrayref({});
1042     $sth->finish();
1043     return $results;
1044 }
1045
1046 =head2 GetItemNumbersFromImportBatch
1047
1048   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1049
1050 =cut
1051
1052 sub GetItemNumbersFromImportBatch {
1053     my ($batch_id) = @_;
1054     my $dbh = C4::Context->dbh;
1055     my $sql = q|
1056 SELECT itemnumber FROM import_items
1057 INNER JOIN items USING (itemnumber)
1058 INNER JOIN import_records USING (import_record_id)
1059 WHERE import_batch_id = ?|;
1060     my  $sth = $dbh->prepare( $sql );
1061     $sth->execute($batch_id);
1062     my @items ;
1063     while ( my ($itm) = $sth->fetchrow_array ) {
1064         push @items, $itm;
1065     }
1066     return @items;
1067 }
1068
1069 =head2 GetNumberOfImportBatches
1070
1071   my $count = GetNumberOfImportBatches();
1072
1073 =cut
1074
1075 sub GetNumberOfNonZ3950ImportBatches {
1076     my $dbh = C4::Context->dbh;
1077     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1078     $sth->execute();
1079     my ($count) = $sth->fetchrow_array();
1080     $sth->finish();
1081     return $count;
1082 }
1083
1084 =head2 GetImportBiblios
1085
1086   my $results = GetImportBiblios($importid);
1087
1088 =cut
1089
1090 sub GetImportBiblios {
1091     my ($import_record_id) = @_;
1092
1093     my $dbh = C4::Context->dbh;
1094     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1095     return $dbh->selectall_arrayref(
1096         $query,
1097         { Slice => {} },
1098         $import_record_id
1099     );
1100
1101 }
1102
1103 =head2 GetImportRecordsRange
1104
1105   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1106
1107 Returns a reference to an array of hash references corresponding to
1108 import_biblios/import_auths/import_records rows for a given batch
1109 starting at the given offset.
1110
1111 =cut
1112
1113 sub GetImportRecordsRange {
1114     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1115
1116     my $dbh = C4::Context->dbh;
1117
1118     my $order_by = $parameters->{order_by} || 'import_record_id';
1119     ( $order_by ) = grep( /^$order_by$/, qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1120
1121     my $order_by_direction =
1122       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1123
1124     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1125
1126     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1127                                            record_sequence, status, overlay_status,
1128                                            matched_biblionumber, matched_authid, record_type
1129                                     FROM   import_records
1130                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1131                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1132                                     WHERE  import_batch_id = ?";
1133     my @params;
1134     push(@params, $batch_id);
1135     if ($status) {
1136         $query .= " AND status=?";
1137         push(@params,$status);
1138     }
1139
1140     $query.=" ORDER BY $order_by $order_by_direction";
1141
1142     if($results_per_group){
1143         $query .= " LIMIT ?";
1144         push(@params, $results_per_group);
1145     }
1146     if($offset){
1147         $query .= " OFFSET ?";
1148         push(@params, $offset);
1149     }
1150     my $sth = $dbh->prepare_cached($query);
1151     $sth->execute(@params);
1152     my $results = $sth->fetchall_arrayref({});
1153     $sth->finish();
1154     return $results;
1155
1156 }
1157
1158 =head2 GetBestRecordMatch
1159
1160   my $record_id = GetBestRecordMatch($import_record_id);
1161
1162 =cut
1163
1164 sub GetBestRecordMatch {
1165     my ($import_record_id) = @_;
1166
1167     my $dbh = C4::Context->dbh;
1168     my $sth = $dbh->prepare("SELECT candidate_match_id
1169                              FROM   import_record_matches
1170                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1171                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1172                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1173                              WHERE  import_record_matches.import_record_id = ? AND
1174                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1175                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1176                              ORDER BY score DESC, candidate_match_id DESC");
1177     $sth->execute($import_record_id);
1178     my ($record_id) = $sth->fetchrow_array();
1179     $sth->finish();
1180     return $record_id;
1181 }
1182
1183 =head2 GetImportBatchStatus
1184
1185   my $status = GetImportBatchStatus($batch_id);
1186
1187 =cut
1188
1189 sub GetImportBatchStatus {
1190     my ($batch_id) = @_;
1191
1192     my $dbh = C4::Context->dbh;
1193     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1194     $sth->execute($batch_id);
1195     my ($status) = $sth->fetchrow_array();
1196     $sth->finish();
1197     return $status;
1198
1199 }
1200
1201 =head2 SetImportBatchStatus
1202
1203   SetImportBatchStatus($batch_id, $new_status);
1204
1205 =cut
1206
1207 sub SetImportBatchStatus {
1208     my ($batch_id, $new_status) = @_;
1209
1210     my $dbh = C4::Context->dbh;
1211     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1212     $sth->execute($new_status, $batch_id);
1213     $sth->finish();
1214
1215 }
1216
1217 =head2 GetImportBatchOverlayAction
1218
1219   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1220
1221 =cut
1222
1223 sub GetImportBatchOverlayAction {
1224     my ($batch_id) = @_;
1225
1226     my $dbh = C4::Context->dbh;
1227     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1228     $sth->execute($batch_id);
1229     my ($overlay_action) = $sth->fetchrow_array();
1230     $sth->finish();
1231     return $overlay_action;
1232
1233 }
1234
1235
1236 =head2 SetImportBatchOverlayAction
1237
1238   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1239
1240 =cut
1241
1242 sub SetImportBatchOverlayAction {
1243     my ($batch_id, $new_overlay_action) = @_;
1244
1245     my $dbh = C4::Context->dbh;
1246     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1247     $sth->execute($new_overlay_action, $batch_id);
1248     $sth->finish();
1249
1250 }
1251
1252 =head2 GetImportBatchNoMatchAction
1253
1254   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1255
1256 =cut
1257
1258 sub GetImportBatchNoMatchAction {
1259     my ($batch_id) = @_;
1260
1261     my $dbh = C4::Context->dbh;
1262     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1263     $sth->execute($batch_id);
1264     my ($nomatch_action) = $sth->fetchrow_array();
1265     $sth->finish();
1266     return $nomatch_action;
1267
1268 }
1269
1270
1271 =head2 SetImportBatchNoMatchAction
1272
1273   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1274
1275 =cut
1276
1277 sub SetImportBatchNoMatchAction {
1278     my ($batch_id, $new_nomatch_action) = @_;
1279
1280     my $dbh = C4::Context->dbh;
1281     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1282     $sth->execute($new_nomatch_action, $batch_id);
1283     $sth->finish();
1284
1285 }
1286
1287 =head2 GetImportBatchItemAction
1288
1289   my $item_action = GetImportBatchItemAction($batch_id);
1290
1291 =cut
1292
1293 sub GetImportBatchItemAction {
1294     my ($batch_id) = @_;
1295
1296     my $dbh = C4::Context->dbh;
1297     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1298     $sth->execute($batch_id);
1299     my ($item_action) = $sth->fetchrow_array();
1300     $sth->finish();
1301     return $item_action;
1302
1303 }
1304
1305
1306 =head2 SetImportBatchItemAction
1307
1308   SetImportBatchItemAction($batch_id, $new_item_action);
1309
1310 =cut
1311
1312 sub SetImportBatchItemAction {
1313     my ($batch_id, $new_item_action) = @_;
1314
1315     my $dbh = C4::Context->dbh;
1316     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1317     $sth->execute($new_item_action, $batch_id);
1318     $sth->finish();
1319
1320 }
1321
1322 =head2 GetImportBatchMatcher
1323
1324   my $matcher_id = GetImportBatchMatcher($batch_id);
1325
1326 =cut
1327
1328 sub GetImportBatchMatcher {
1329     my ($batch_id) = @_;
1330
1331     my $dbh = C4::Context->dbh;
1332     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1333     $sth->execute($batch_id);
1334     my ($matcher_id) = $sth->fetchrow_array();
1335     $sth->finish();
1336     return $matcher_id;
1337
1338 }
1339
1340
1341 =head2 SetImportBatchMatcher
1342
1343   SetImportBatchMatcher($batch_id, $new_matcher_id);
1344
1345 =cut
1346
1347 sub SetImportBatchMatcher {
1348     my ($batch_id, $new_matcher_id) = @_;
1349
1350     my $dbh = C4::Context->dbh;
1351     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1352     $sth->execute($new_matcher_id, $batch_id);
1353     $sth->finish();
1354
1355 }
1356
1357 =head2 GetImportRecordOverlayStatus
1358
1359   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1360
1361 =cut
1362
1363 sub GetImportRecordOverlayStatus {
1364     my ($import_record_id) = @_;
1365
1366     my $dbh = C4::Context->dbh;
1367     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1368     $sth->execute($import_record_id);
1369     my ($overlay_status) = $sth->fetchrow_array();
1370     $sth->finish();
1371     return $overlay_status;
1372
1373 }
1374
1375
1376 =head2 SetImportRecordOverlayStatus
1377
1378   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1379
1380 =cut
1381
1382 sub SetImportRecordOverlayStatus {
1383     my ($import_record_id, $new_overlay_status) = @_;
1384
1385     my $dbh = C4::Context->dbh;
1386     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1387     $sth->execute($new_overlay_status, $import_record_id);
1388     $sth->finish();
1389
1390 }
1391
1392 =head2 GetImportRecordStatus
1393
1394   my $status = GetImportRecordStatus($import_record_id);
1395
1396 =cut
1397
1398 sub GetImportRecordStatus {
1399     my ($import_record_id) = @_;
1400
1401     my $dbh = C4::Context->dbh;
1402     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1403     $sth->execute($import_record_id);
1404     my ($status) = $sth->fetchrow_array();
1405     $sth->finish();
1406     return $status;
1407
1408 }
1409
1410
1411 =head2 SetImportRecordStatus
1412
1413   SetImportRecordStatus($import_record_id, $new_status);
1414
1415 =cut
1416
1417 sub SetImportRecordStatus {
1418     my ($import_record_id, $new_status) = @_;
1419
1420     my $dbh = C4::Context->dbh;
1421     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1422     $sth->execute($new_status, $import_record_id);
1423     $sth->finish();
1424
1425 }
1426
1427 =head2 GetImportRecordMatches
1428
1429   my $results = GetImportRecordMatches($import_record_id, $best_only);
1430
1431 =cut
1432
1433 sub GetImportRecordMatches {
1434     my $import_record_id = shift;
1435     my $best_only = @_ ? shift : 0;
1436
1437     my $dbh = C4::Context->dbh;
1438     # FIXME currently biblio only
1439     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1440                                     candidate_match_id, score, record_type
1441                                     FROM import_records
1442                                     JOIN import_record_matches USING (import_record_id)
1443                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1444                                     WHERE import_record_id = ?
1445                                     ORDER BY score DESC, biblionumber DESC");
1446     $sth->bind_param(1, $import_record_id);
1447     my $results = [];
1448     $sth->execute();
1449     while (my $row = $sth->fetchrow_hashref) {
1450         if ($row->{'record_type'} eq 'auth') {
1451             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1452         }
1453         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1454         push @$results, $row;
1455         last if $best_only;
1456     }
1457     $sth->finish();
1458
1459     return $results;
1460     
1461 }
1462
1463 =head2 SetImportRecordMatches
1464
1465   SetImportRecordMatches($import_record_id, @matches);
1466
1467 =cut
1468
1469 sub SetImportRecordMatches {
1470     my $import_record_id = shift;
1471     my @matches = @_;
1472
1473     my $dbh = C4::Context->dbh;
1474     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1475     $delsth->execute($import_record_id);
1476     $delsth->finish();
1477
1478     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score)
1479                                     VALUES (?, ?, ?)");
1480     foreach my $match (@matches) {
1481         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'});
1482     }
1483 }
1484
1485 =head2 RecordsFromISO2709File
1486
1487     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1488
1489 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1490
1491 @PARAM1, String, absolute path to the ISO2709 file.
1492 @PARAM2, String, see stage_file.pl
1493 @PARAM3, String, should be utf8
1494
1495 Returns two array refs.
1496
1497 =cut
1498
1499 sub RecordsFromISO2709File {
1500     my ($input_file, $record_type, $encoding) = @_;
1501     my @errors;
1502
1503     my $marc_type = C4::Context->preference('marcflavour');
1504     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1505
1506     open IN, "<$input_file" or die "$0: cannot open input file $input_file: $!\n";
1507     my @marc_records;
1508     $/ = "\035";
1509     while (<IN>) {
1510         s/^\s+//;
1511         s/\s+$//;
1512         next unless $_; # skip if record has only whitespace, as might occur
1513                         # if file includes newlines between each MARC record
1514         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1515         push @marc_records, $marc_record;
1516         if ($charset_guessed ne $encoding) {
1517             push @errors,
1518                 "Unexpected charset $charset_guessed, expecting $encoding";
1519         }
1520     }
1521     close IN;
1522     return ( \@errors, \@marc_records );
1523 }
1524
1525 =head2 RecordsFromMARCXMLFile
1526
1527     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1528
1529 Creates MARC::Record-objects out of the given MARCXML-file.
1530
1531 @PARAM1, String, absolute path to the ISO2709 file.
1532 @PARAM2, String, should be utf8
1533
1534 Returns two array refs.
1535
1536 =cut
1537
1538 sub RecordsFromMARCXMLFile {
1539     my ( $filename, $encoding ) = @_;
1540     my $batch = MARC::File::XML->in( $filename );
1541     my ( @marcRecords, @errors, $record );
1542     do {
1543         eval { $record = $batch->next( $encoding ); };
1544         if ($@) {
1545             push @errors, $@;
1546         }
1547         push @marcRecords, $record if $record;
1548     } while( $record );
1549     return (\@errors, \@marcRecords);
1550 }
1551
1552 =head2 RecordsFromMarcPlugin
1553
1554     Converts text of input_file into array of MARC records with to_marc plugin
1555
1556 =cut
1557
1558 sub RecordsFromMarcPlugin {
1559     my ($input_file, $plugin_class, $encoding) = @_;
1560     my ( $text, @return );
1561     return \@return if !$input_file || !$plugin_class;
1562
1563     # Read input file
1564     open IN, "<$input_file" or die "$0: cannot open input file $input_file: $!\n";
1565     $/ = "\035";
1566     while (<IN>) {
1567         s/^\s+//;
1568         s/\s+$//;
1569         next unless $_;
1570         $text .= $_;
1571     }
1572     close IN;
1573
1574     # Convert to large MARC blob with plugin
1575     $text = Koha::Plugins::Handler->run({
1576         class  => $plugin_class,
1577         method => 'to_marc',
1578         params => { data => $text },
1579     }) if $text;
1580
1581     # Convert to array of MARC records
1582     if( $text ) {
1583         my $marc_type = C4::Context->preference('marcflavour');
1584         foreach my $blob ( split(/\x1D/, $text) ) {
1585             next if $blob =~ /^\s*$/;
1586             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1587             push @return, $marcrecord;
1588         }
1589     }
1590     return \@return;
1591 }
1592
1593 # internal functions
1594
1595 sub _create_import_record {
1596     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $z3950random, $marc_type) = @_;
1597
1598     my $dbh = C4::Context->dbh;
1599     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1600                                                          record_type, encoding, z3950random)
1601                                     VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
1602     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1603                   $record_type, $encoding, $z3950random);
1604     my $import_record_id = $dbh->{'mysql_insertid'};
1605     $sth->finish();
1606     return $import_record_id;
1607 }
1608
1609 sub _update_import_record_marc {
1610     my ($import_record_id, $marc_record, $marc_type) = @_;
1611
1612     my $dbh = C4::Context->dbh;
1613     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1614                              WHERE  import_record_id = ?");
1615     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1616     $sth->finish();
1617 }
1618
1619 sub _add_auth_fields {
1620     my ($import_record_id, $marc_record) = @_;
1621
1622     my $controlnumber;
1623     if ($marc_record->field('001')) {
1624         $controlnumber = $marc_record->field('001')->data();
1625     }
1626     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1627     my $dbh = C4::Context->dbh;
1628     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1629     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1630     $sth->finish();
1631 }
1632
1633 sub _add_biblio_fields {
1634     my ($import_record_id, $marc_record) = @_;
1635
1636     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1637     my $dbh = C4::Context->dbh;
1638     # FIXME no controlnumber, originalsource
1639     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1640     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1641     $sth->execute($import_record_id, $title, $author, $isbn, $issn);
1642     $sth->finish();
1643                 
1644 }
1645
1646 sub _update_biblio_fields {
1647     my ($import_record_id, $marc_record) = @_;
1648
1649     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1650     my $dbh = C4::Context->dbh;
1651     # FIXME no controlnumber, originalsource
1652     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1653     $isbn =~ s/\(.*$//;
1654     $isbn =~ tr/ -_//;
1655     $isbn = uc $isbn;
1656     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1657                              WHERE  import_record_id = ?");
1658     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1659     $sth->finish();
1660 }
1661
1662 sub _parse_biblio_fields {
1663     my ($marc_record) = @_;
1664
1665     my $dbh = C4::Context->dbh;
1666     my $bibliofields = TransformMarcToKoha($marc_record, '');
1667     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1668
1669 }
1670
1671 sub _update_batch_record_counts {
1672     my ($batch_id) = @_;
1673
1674     my $dbh = C4::Context->dbh;
1675     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1676                                         num_records = (
1677                                             SELECT COUNT(*)
1678                                             FROM import_records
1679                                             WHERE import_batch_id = import_batches.import_batch_id),
1680                                         num_items = (
1681                                             SELECT COUNT(*)
1682                                             FROM import_records
1683                                             JOIN import_items USING (import_record_id)
1684                                             WHERE import_batch_id = import_batches.import_batch_id
1685                                             AND record_type = 'biblio')
1686                                     WHERE import_batch_id = ?");
1687     $sth->bind_param(1, $batch_id);
1688     $sth->execute();
1689     $sth->finish();
1690 }
1691
1692 sub _get_commit_action {
1693     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1694     
1695     if ($record_type eq 'biblio') {
1696         my ($bib_result, $bib_match, $item_result);
1697
1698         if ($overlay_status ne 'no_match') {
1699             $bib_match = GetBestRecordMatch($import_record_id);
1700             if ($overlay_action eq 'replace') {
1701                 $bib_result  = defined($bib_match) ? 'replace' : 'create_new';
1702             } elsif ($overlay_action eq 'create_new') {
1703                 $bib_result  = 'create_new';
1704             } elsif ($overlay_action eq 'ignore') {
1705                 $bib_result  = 'ignore';
1706             }
1707          if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1708                 $item_result = 'create_new';
1709        }
1710       elsif($item_action eq 'replace'){
1711           $item_result = 'replace';
1712           }
1713       else {
1714              $item_result = 'ignore';
1715            }
1716         } else {
1717             $bib_result = $nomatch_action;
1718             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new')     ? 'create_new' : 'ignore';
1719         }
1720         return ($bib_result, $item_result, $bib_match);
1721     } else { # must be auths
1722         my ($auth_result, $auth_match);
1723
1724         if ($overlay_status ne 'no_match') {
1725             $auth_match = GetBestRecordMatch($import_record_id);
1726             if ($overlay_action eq 'replace') {
1727                 $auth_result  = defined($auth_match) ? 'replace' : 'create_new';
1728             } elsif ($overlay_action eq 'create_new') {
1729                 $auth_result  = 'create_new';
1730             } elsif ($overlay_action eq 'ignore') {
1731                 $auth_result  = 'ignore';
1732             }
1733         } else {
1734             $auth_result = $nomatch_action;
1735         }
1736
1737         return ($auth_result, undef, $auth_match);
1738
1739     }
1740 }
1741
1742 sub _get_revert_action {
1743     my ($overlay_action, $overlay_status, $status) = @_;
1744
1745     my $bib_result;
1746
1747     if ($status eq 'ignored') {
1748         $bib_result = 'ignore';
1749     } else {
1750         if ($overlay_action eq 'create_new') {
1751             $bib_result = 'delete';
1752         } else {
1753             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1754         }
1755     }
1756     return $bib_result;
1757 }
1758
1759 1;
1760 __END__
1761
1762 =head1 AUTHOR
1763
1764 Koha Development Team <http://koha-community.org/>
1765
1766 Galen Charlton <galen.charlton@liblime.com>
1767
1768 =cut