LP#1979071: stamp DB update
[evergreen-equinox.git] / Open-ILS / src / sql / Pg / upgrade / 1369.schema.queued_ingest.sql
1 BEGIN;
2
3 SELECT evergreen.upgrade_deps_block_check('1369', :eg_version);
4
5 INSERT INTO config.global_flag (name, enabled, label) VALUES (
6     'ingest.queued.max_threads',  TRUE,
7     oils_i18n_gettext(
8         'ingest.queued.max_threads',
9         'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
10         'cgf',
11         'label'
12     )),(
13     'ingest.queued.abort_on_error',  FALSE,
14     oils_i18n_gettext(
15         'ingest.queued.abort_on_error',
16         'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
17         'cgf',
18         'label'
19     )),(
20     'ingest.queued.authority.propagate',  FALSE,
21     oils_i18n_gettext(
22         'ingest.queued.authority.propagate',
23         'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
24         'cgf',
25         'label'
26     )),(
27     'ingest.queued.all',  FALSE,
28     oils_i18n_gettext(
29         'ingest.queued.all',
30         'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
31         'cgf',
32         'label'
33     )),(
34     'ingest.queued.biblio.all',  FALSE,
35     oils_i18n_gettext(
36         'ingest.queued.biblio.all',
37         'Queued Ingest: Use Queued Ingest for all bib record ingest',
38         'cgf',
39         'label'
40     )),(
41     'ingest.queued.authority.all',  FALSE,
42     oils_i18n_gettext(
43         'ingest.queued.authority.all',
44         'Queued Ingest: Use Queued Ingest for all authority record ingest',
45         'cgf',
46         'label'
47     )),(
48     'ingest.queued.biblio.insert.marc_edit_inline',  TRUE,
49     oils_i18n_gettext(
50         'ingest.queued.biblio.insert.marc_edit_inline',
51         'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
52         'cgf',
53         'label'
54     )),(
55     'ingest.queued.biblio.insert',  FALSE,
56     oils_i18n_gettext(
57         'ingest.queued.biblio.insert',
58         'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
59         'cgf',
60         'label'
61     )),(
62     'ingest.queued.authority.insert',  FALSE,
63     oils_i18n_gettext(
64         'ingest.queued.authority.insert',
65         'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
66         'cgf',
67         'label'
68     )),(
69     'ingest.queued.biblio.update.marc_edit_inline',  TRUE,
70     oils_i18n_gettext(
71         'ingest.queued.biblio.update.marc_edit_inline',
72         'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
73         'cgf',
74         'label'
75     )),(
76     'ingest.queued.biblio.update',  FALSE,
77     oils_i18n_gettext(
78         'ingest.queued.biblio.update',
79         'Queued Ingest: Use Queued Ingest for bib record ingest on update',
80         'cgf',
81         'label'
82     )),(
83     'ingest.queued.authority.update',  FALSE,
84     oils_i18n_gettext(
85         'ingest.queued.authority.update',
86         'Queued Ingest: Use Queued Ingest for authority record ingest on update',
87         'cgf',
88         'label'
89     )),(
90     'ingest.queued.biblio.delete',  FALSE,
91     oils_i18n_gettext(
92         'ingest.queued.biblio.delete',
93         'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
94         'cgf',
95         'label'
96     )),(
97     'ingest.queued.authority.delete',  FALSE,
98     oils_i18n_gettext(
99         'ingest.queued.authority.delete',
100         'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
101         'cgf',
102         'label'
103     )
104 );
105
106 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
107
108 CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
109 DECLARE
110     search_class    TEXT;
111     new_value       TEXT := NULL;
112     old_value       TEXT := NULL;
113     _atag           INTEGER;
114 BEGIN
115
116     IF TG_TABLE_SCHEMA = 'authority' THEN
117         IF TG_OP IN ('INSERT', 'UPDATE') THEN
118             _atag = NEW.atag;
119         ELSE
120             _atag = OLD.atag;
121         END IF;
122
123         SELECT  m.field_class INTO search_class
124           FROM  authority.control_set_auth_field_metabib_field_map_refs a
125                 JOIN config.metabib_field m ON (a.metabib_field=m.id)
126           WHERE a.authority_field = _atag;
127
128         IF NOT FOUND THEN
129             RETURN NULL;
130         END IF;
131     ELSE
132         search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1));
133     END IF;
134
135     IF TG_OP IN ('INSERT', 'UPDATE') THEN
136         new_value := NEW.value;
137     END IF;
138
139     IF TG_OP IN ('DELETE', 'UPDATE') THEN
140         old_value := OLD.value;
141     END IF;
142
143     IF new_value = old_value THEN
144         -- same, move along
145     ELSE
146         INSERT INTO search.symspell_dictionary_updates
147             SELECT  txid_current(), *
148               FROM  search.symspell_build_entries(
149                         new_value,
150                         search_class,
151                         old_value
152                     );
153     END IF;
154
155     -- PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value);
156
157     RETURN NULL; -- always fired AFTER
158 END;
159 $f$ LANGUAGE PLPGSQL;
160
161 CREATE TABLE action.ingest_queue (
162     id          SERIAL      PRIMARY KEY,
163     created     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
164     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
165     who         INT         REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
166     start_time  TIMESTAMPTZ,
167     end_time    TIMESTAMPTZ,
168     threads     INT,
169     why         TEXT
170 );
171
172 CREATE TABLE action.ingest_queue_entry (
173     id          BIGSERIAL   PRIMARY KEY,
174     record      BIGINT      NOT NULL, -- points to a record id of the appropriate record_type
175     record_type TEXT        NOT NULL,
176     action      TEXT        NOT NULL,
177     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
178     state_data  TEXT        NOT NULL DEFAULT '',
179     queue       INT         REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
180     override_by BIGINT      REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
181     ingest_time TIMESTAMPTZ,
182     fail_time   TIMESTAMPTZ
183 );
184 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
185 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
186
187 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
188     record_id       BIGINT,
189     rtype           TEXT DEFAULT 'biblio',
190     when_to_run     TIMESTAMPTZ DEFAULT NOW(),
191     queue_id        INT  DEFAULT NULL,
192     ingest_action   TEXT DEFAULT 'update', -- will be the most common?
193     old_state_data  TEXT DEFAULT ''
194 ) RETURNS BOOL AS $F$
195 DECLARE
196     new_entry       action.ingest_queue_entry%ROWTYPE;
197     prev_del_entry  action.ingest_queue_entry%ROWTYPE;
198     diag_detail     TEXT;
199     diag_context    TEXT;
200 BEGIN
201
202     IF ingest_action = 'delete' THEN
203         -- first see if there is an outstanding entry
204         SELECT  * INTO prev_del_entry
205           FROM  action.ingest_queue_entry
206           WHERE qe.record = record_id
207                 AND qe.state_date = old_state_data
208                 AND qe.record_type = rtype
209                 AND qe.ingest_time IS NULL
210                 AND qe.override_by IS NULL;
211     END IF;
212
213     WITH existing_queue_entry_cte AS (
214         SELECT  queue_id AS queue,
215                 rtype AS record_type,
216                 record_id AS record,
217                 qe.id AS override_by,
218                 ingest_action AS action,
219                 q.run_at AS run_at,
220                 old_state_data AS state_data
221           FROM  action.ingest_queue_entry qe
222                 JOIN action.ingest_queue q ON (qe.queue = q.id)
223           WHERE qe.record = record_id
224                 AND q.end_time IS NULL
225                 AND qe.record_type = rtype
226                 AND qe.state_data = old_state_data
227                 AND qe.ingest_time IS NULL
228                 AND qe.fail_time IS NULL
229                 AND qe.override_by IS NULL
230     ), existing_nonqueue_entry_cte AS (
231         SELECT  queue_id AS queue,
232                 rtype AS record_type,
233                 record_id AS record,
234                 qe.id AS override_by,
235                 ingest_action AS action,
236                 qe.run_at AS run_at,
237                 old_state_data AS state_data
238           FROM  action.ingest_queue_entry qe
239           WHERE qe.record = record_id
240                 AND qe.queue IS NULL
241                 AND qe.record_type = rtype
242                 AND qe.state_data = old_state_data
243                 AND qe.ingest_time IS NULL
244                 AND qe.fail_time IS NULL
245                 AND qe.override_by IS NULL
246     ), new_entry_cte AS (
247         SELECT * FROM existing_queue_entry_cte
248           UNION ALL
249         SELECT * FROM existing_nonqueue_entry_cte
250           UNION ALL
251         SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
252     ), insert_entry_cte AS (
253         INSERT INTO action.ingest_queue_entry
254             (queue, record_type, record, override_by, action, run_at, state_data)
255           SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
256             ORDER BY 4 NULLS LAST, 6
257             LIMIT 1
258         RETURNING *
259     ) SELECT * INTO new_entry FROM insert_entry_cte;
260
261     IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
262         UPDATE  action.ingest_queue_entry
263           SET   override_by = new_entry.id
264           WHERE id = prev_del_entry.id;
265
266         UPDATE  action.ingest_queue_entry
267           SET   override_by = NULL
268           WHERE id = new_entry.id;
269
270     ELSIF new_entry.override_by IS NOT NULL THEN
271         RETURN TRUE; -- already handled, don't notify
272     END IF;
273
274     NOTIFY queued_ingest;
275
276     RETURN TRUE;
277 EXCEPTION WHEN OTHERS THEN
278     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
279                             diag_context = PG_EXCEPTION_CONTEXT;
280     RAISE WARNING '%\n%', diag_detail, diag_context;
281     RETURN FALSE;
282 END;
283 $F$ LANGUAGE PLPGSQL;
284
285 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
286 DECLARE
287     ingest_success  BOOL := NULL;
288     qe              action.ingest_queue_entry%ROWTYPE;
289     aid             authority.record_entry.id%TYPE;
290 BEGIN
291
292     SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
293     IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
294         RETURN TRUE; -- Already done
295     END IF;
296
297     IF qe.action = 'delete' THEN
298         IF qe.record_type = 'biblio' THEN
299             SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
300         ELSIF qe.record_type = 'authority' THEN
301             SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
302         END IF;
303     ELSE
304         IF qe.record_type = 'biblio' THEN
305             IF qe.action = 'propagate' THEN
306                 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO aid;
307                 SELECT aid = qe.state_data::BIGINT INTO ingest_success;
308             ELSE
309                 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
310             END IF;
311         ELSIF qe.record_type = 'authority' THEN
312             SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
313         END IF;
314     END IF;
315
316     IF NOT ingest_success THEN
317         UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
318         PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
319         IF FOUND THEN
320             RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
321         ELSE
322             RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
323         END IF;
324     ELSE
325         UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
326     END IF;
327
328     RETURN ingest_success;
329 END;
330 $func$ LANGUAGE PLPGSQL;
331
332 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
333 BEGIN
334     IF NEW.ingest_time IS NOT NULL THEN
335         UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
336     END IF;
337
338     RETURN NULL;
339 END;
340 $F$ LANGUAGE PLPGSQL;
341
342 CREATE TRIGGER complete_duplicated_entries_trigger
343     AFTER UPDATE ON action.ingest_queue_entry
344     FOR EACH ROW WHEN (NEW.override_by IS NULL)
345     EXECUTE PROCEDURE action.complete_duplicated_entries();
346
347 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
348     $_SHARED{"ingest_queue_id"} = $_[0];
349 $$ LANGUAGE plperlu;
350
351 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
352     return $_SHARED{"ingest_queue_id"};
353 $$ LANGUAGE plperlu;
354
355 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
356     delete($_SHARED{"ingest_queue_id"});
357 $$ LANGUAGE plperlu;
358
359 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
360     $_SHARED{"ingest_queue_force"} = $_[0];
361 $$ LANGUAGE plperlu;
362
363 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
364     return $_SHARED{"ingest_queue_force"};
365 $$ LANGUAGE plperlu;
366
367 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
368     delete($_SHARED{"ingest_queue_force"});
369 $$ LANGUAGE plperlu;
370
371 ------------------ ingest functions ------------------
372
373 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
374 DECLARE
375     tmp_bool BOOL;
376     diag_detail     TEXT;
377     diag_context    TEXT;
378 BEGIN
379     PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
380     tmp_bool := FOUND;
381
382     PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
383
384     IF NOT tmp_bool THEN
385         -- One needs to keep these around to support searches
386         -- with the #deleted modifier, so one should turn on the named
387         -- internal flag for that functionality.
388         DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
389     END IF;
390
391     DELETE FROM authority.bib_linking abl WHERE abl.bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
392     DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
393     DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
394
395     RETURN TRUE;
396 EXCEPTION WHEN OTHERS THEN
397     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
398                             diag_context = PG_EXCEPTION_CONTEXT;
399     RAISE WARNING '%\n%', diag_detail, diag_context;
400     RETURN FALSE;
401 END;
402 $func$ LANGUAGE PLPGSQL;
403
404 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
405 DECLARE
406     skip_facet   BOOL   := FALSE;
407     skip_display BOOL   := FALSE;
408     skip_browse  BOOL   := FALSE;
409     skip_search  BOOL   := FALSE;
410     skip_auth    BOOL   := FALSE;
411     skip_full    BOOL   := FALSE;
412     skip_attrs   BOOL   := FALSE;
413     skip_luri    BOOL   := FALSE;
414     skip_mrmap   BOOL   := FALSE;
415     only_attrs   TEXT[] := NULL;
416     only_fields  INT[]  := '{}'::INT[];
417     diag_detail     TEXT;
418     diag_context    TEXT;
419 BEGIN
420
421     -- Record authority linking
422     SELECT extra LIKE '%skip_authority%' INTO skip_auth;
423     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
424     IF NOT FOUND AND NOT skip_auth THEN
425         PERFORM biblio.map_authority_linking( bib.id, bib.marc );
426     END IF;
427
428     -- Flatten and insert the mfr data
429     SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
430     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
431     IF NOT FOUND AND NOT skip_full THEN
432         PERFORM metabib.reingest_metabib_full_rec(bib.id);
433     END IF;
434
435     -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
436     SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
437     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
438     IF NOT FOUND AND NOT skip_attrs THEN
439         IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
440             SELECT REGEXP_SPLIT_TO_ARRAY(
441                 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
442                 '\s*,\s*'
443             ) INTO only_attrs;
444         END IF;
445
446         PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
447     END IF;
448
449     -- Gather and insert the field entry data
450     SELECT extra LIKE '%skip_facet%' INTO skip_facet;
451     SELECT extra LIKE '%skip_display%' INTO skip_display;
452     SELECT extra LIKE '%skip_browse%' INTO skip_browse;
453     SELECT extra LIKE '%skip_search%' INTO skip_search;
454
455     IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
456         SELECT REGEXP_SPLIT_TO_ARRAY(
457             (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
458             '\s*,\s*'
459         )::INT[] INTO only_fields;
460     END IF;
461
462     IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
463         PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
464     END IF;
465
466     -- Located URI magic
467     SELECT extra LIKE '%skip_luri%' INTO skip_luri;
468     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
469     IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
470
471     -- (re)map metarecord-bib linking
472     SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
473     IF insert_only THEN -- if not deleted and performing an insert, check for the flag
474         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
475         IF NOT FOUND AND NOT skip_mrmap THEN
476             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
477         END IF;
478     ELSE -- we're doing an update, and we're not deleted, remap
479         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
480         IF NOT FOUND AND NOT skip_mrmap THEN
481             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
482         END IF;
483     END IF;
484
485     RETURN TRUE;
486 EXCEPTION WHEN OTHERS THEN
487     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
488                             diag_context = PG_EXCEPTION_CONTEXT;
489     RAISE WARNING '%\n%', diag_detail, diag_context;
490     RETURN FALSE;
491 END;
492 $func$ LANGUAGE PLPGSQL;
493
494 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
495 DECLARE
496     tmp_bool BOOL;
497     diag_detail     TEXT;
498     diag_context    TEXT;
499 BEGIN
500     DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
501     DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
502     DELETE FROM authority.simple_heading WHERE record = NEW.id;
503       -- Should remove matching $0 from controlled fields at the same time?
504
505     -- XXX What do we about the actual linking subfields present in
506     -- authority records that target this one when this happens?
507     DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
508
509     RETURN TRUE;
510 EXCEPTION WHEN OTHERS THEN
511     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
512                             diag_context = PG_EXCEPTION_CONTEXT;
513     RAISE WARNING '%\n%', diag_detail, diag_context;
514     RETURN FALSE;
515 END;
516 $func$ LANGUAGE PLPGSQL;
517
518
519 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
520 DECLARE
521     ashs    authority.simple_heading%ROWTYPE;
522     mbe_row metabib.browse_entry%ROWTYPE;
523     mbe_id  BIGINT;
524     ash_id  BIGINT;
525     diag_detail     TEXT;
526     diag_context    TEXT;
527 BEGIN
528
529     -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
530     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
531
532     IF NOT FOUND AND auth.heading <> old_heading THEN
533         PERFORM authority.propagate_changes(auth.id);
534     END IF;
535
536     IF NOT insert_only THEN
537         DELETE FROM authority.authority_linking WHERE source = auth.id;
538         DELETE FROM authority.simple_heading WHERE record = auth.id;
539     END IF;
540
541     INSERT INTO authority.authority_linking (source, target, field)
542         SELECT source, target, field FROM authority.calculate_authority_linking(
543             auth.id, auth.control_set, auth.marc::XML
544         );
545
546     FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
547
548         INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
549             VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
550             ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
551
552         SELECT INTO mbe_row * FROM metabib.browse_entry
553             WHERE value = ashs.value AND sort_value = ashs.sort_value;
554
555         IF FOUND THEN
556             mbe_id := mbe_row.id;
557         ELSE
558             INSERT INTO metabib.browse_entry
559                 ( value, sort_value ) VALUES
560                 ( ashs.value, ashs.sort_value );
561
562             mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
563         END IF;
564
565         INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
566
567     END LOOP;
568
569     -- Flatten and insert the afr data
570     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
571     IF NOT FOUND THEN
572         PERFORM authority.reingest_authority_full_rec(auth.id);
573         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
574         IF NOT FOUND THEN
575             PERFORM authority.reingest_authority_rec_descriptor(auth.id);
576         END IF;
577     END IF;
578
579     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
580     IF NOT FOUND THEN
581         PERFORM search.symspell_dictionary_reify();
582     END IF;
583
584     RETURN TRUE;
585 EXCEPTION WHEN OTHERS THEN
586     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
587                             diag_context = PG_EXCEPTION_CONTEXT;
588     RAISE WARNING '%\n%', diag_detail, diag_context;
589     RETURN FALSE;
590 END;
591 $func$ LANGUAGE PLPGSQL;
592
593 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
594 DECLARE
595     old_state_data      TEXT := '';
596     new_action          TEXT;
597     queuing_force       TEXT;
598     queuing_flag_name   TEXT;
599     queuing_flag        BOOL := FALSE;
600     queuing_success     BOOL := FALSE;
601     ingest_success      BOOL := FALSE;
602     ingest_queue        INT;
603 BEGIN
604
605     -- Identify the ingest action type
606     IF TG_OP = 'UPDATE' THEN
607
608         -- Gather type-specific data for later use
609         IF TG_TABLE_SCHEMA = 'authority' THEN
610             old_state_data = OLD.heading;
611         END IF;
612
613         IF NOT OLD.deleted THEN -- maybe reingest?
614             IF NEW.deleted THEN
615                 new_action = 'delete'; -- nope, delete
616             ELSE
617                 new_action = 'update'; -- yes, update
618             END IF;
619         ELSIF NOT NEW.deleted THEN
620             new_action = 'insert'; -- revivify, AKA insert
621         ELSE
622             RETURN NEW; -- was and is still deleted, don't ingest
623         END IF;
624     ELSIF TG_OP = 'INSERT' THEN
625         new_action = 'insert'; -- brand new
626     ELSE
627         RETURN OLD; -- really deleting the record
628     END IF;
629
630     queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
631     -- See if we should be queuing anything
632     SELECT  enabled INTO queuing_flag
633       FROM  config.internal_flag
634       WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
635             AND enabled
636       LIMIT 1;
637
638     SELECT action.get_queued_ingest_force() INTO queuing_force;
639     IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
640         queuing_flag := TRUE;
641     END IF;
642
643     -- you (or part of authority propagation) can forcibly disable specific queuing actions
644     IF queuing_force = queuing_flag_name||'.disabled' THEN
645         queuing_flag := FALSE;
646     END IF;
647
648     -- And if we should be queuing ...
649     IF queuing_flag THEN
650         ingest_queue := action.get_ingest_queue();
651
652         -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
653         IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
654
655             PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
656
657             --  ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
658             IF NOT FOUND AND OLD.marc = NEW.marc THEN
659                 RETURN NEW;
660             END IF;
661         END IF;
662
663         -- Otherwise, attempt to enqueue
664         SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
665     END IF;
666
667     -- If queuing was not requested, or failed for some reason, do it live.
668     IF NOT queuing_success THEN
669         IF queuing_flag THEN
670             RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
671         END IF;
672
673         IF new_action = 'delete' THEN
674             IF TG_TABLE_SCHEMA = 'biblio' THEN
675                 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
676             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
677                 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
678             END IF;
679         ELSE
680             IF TG_TABLE_SCHEMA = 'biblio' THEN
681                 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
682             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
683                 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
684             END IF;
685         END IF;
686         
687         IF NOT ingest_success THEN
688             PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
689             IF FOUND THEN
690                 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
691             ELSE
692                 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
693             END IF;
694         END IF;
695     END IF;
696
697     RETURN NEW;
698 END;
699 $func$ LANGUAGE PLPGSQL;
700
701 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
702 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
703
704 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
705 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
706
707 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
708 DECLARE
709     transformed_xml TEXT;
710     rmarc           TEXT := prmarc;
711     tmp_val         TEXT;
712     prev_xfrm       TEXT;
713     normalizer      RECORD;
714     xfrm            config.xml_transform%ROWTYPE;
715     attr_vector     INT[] := '{}'::INT[];
716     attr_vector_tmp INT[];
717     attr_list       TEXT[] := pattr_list;
718     attr_value      TEXT[];
719     norm_attr_value TEXT[];
720     tmp_xml         TEXT;
721     tmp_array       TEXT[];
722     attr_def        config.record_attr_definition%ROWTYPE;
723     ccvm_row        config.coded_value_map%ROWTYPE;
724     jump_past       BOOL;
725 BEGIN
726
727     IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
728         SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
729         WHERE (
730             tag IS NOT NULL OR
731             fixed_field IS NOT NULL OR
732             xpath IS NOT NULL OR
733             phys_char_sf IS NOT NULL OR
734             composite
735         ) AND (
736             filter OR sorter
737         );
738     END IF;
739
740     IF rmarc IS NULL THEN
741         SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
742     END IF;
743
744     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
745
746         jump_past := FALSE; -- This gets set when we are non-multi and have found something
747         attr_value := '{}'::TEXT[];
748         norm_attr_value := '{}'::TEXT[];
749         attr_vector_tmp := '{}'::INT[];
750
751         SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
752
753         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
754             SELECT  ARRAY_AGG(value) INTO attr_value
755               FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
756               WHERE record = rid
757                     AND tag LIKE attr_def.tag
758                     AND CASE
759                         WHEN attr_def.sf_list IS NOT NULL
760                             THEN POSITION(subfield IN attr_def.sf_list) > 0
761                         ELSE TRUE
762                     END
763               GROUP BY tag
764               ORDER BY tag;
765
766             IF NOT attr_def.multi THEN
767                 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
768                 jump_past := TRUE;
769             END IF;
770         END IF;
771
772         IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
773             attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
774
775             IF NOT attr_def.multi THEN
776                 attr_value := ARRAY[attr_value[1]];
777                 jump_past := TRUE;
778             END IF;
779         END IF;
780
781         IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
782
783             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
784
785             -- See if we can skip the XSLT ... it's expensive
786             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
787                 -- Can't skip the transform
788                 IF xfrm.xslt <> '---' THEN
789                     transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
790                 ELSE
791                     transformed_xml := rmarc;
792                 END IF;
793
794                 prev_xfrm := xfrm.name;
795             END IF;
796
797             IF xfrm.name IS NULL THEN
798                 -- just grab the marcxml (empty) transform
799                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
800                 prev_xfrm := xfrm.name;
801             END IF;
802
803             FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
804                 tmp_val := oils_xpath_string(
805                                 '//*',
806                                 tmp_xml,
807                                 COALESCE(attr_def.joiner,' '),
808                                 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
809                             );
810                 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
811                     attr_value := attr_value || tmp_val;
812                     EXIT WHEN NOT attr_def.multi;
813                 END IF;
814             END LOOP;
815         END IF;
816
817         IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
818             SELECT  ARRAY_AGG(m.value) INTO tmp_array
819               FROM  vandelay.marc21_physical_characteristics(rmarc) v
820                     LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
821               WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
822                     AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
823
824             attr_value := attr_value || tmp_array;
825
826             IF NOT attr_def.multi THEN
827                 attr_value := ARRAY[attr_value[1]];
828             END IF;
829
830         END IF;
831
832                 -- apply index normalizers to attr_value
833         FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
834             FOR normalizer IN
835                 SELECT  n.func AS func,
836                         n.param_count AS param_count,
837                         m.params AS params
838                   FROM  config.index_normalizer n
839                         JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
840                   WHERE attr = attr_def.name
841                   ORDER BY m.pos LOOP
842                     EXECUTE 'SELECT ' || normalizer.func || '(' ||
843                     COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
844                         CASE
845                             WHEN normalizer.param_count > 0
846                                 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
847                                 ELSE ''
848                             END ||
849                     ')' INTO tmp_val;
850
851             END LOOP;
852             IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
853                 -- note that a string that contains only blanks
854                 -- is a valid value for some attributes
855                 norm_attr_value := norm_attr_value || tmp_val;
856             END IF;
857         END LOOP;
858
859         IF attr_def.filter THEN
860             -- Create unknown uncontrolled values and find the IDs of the values
861             IF ccvm_row.id IS NULL THEN
862                 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
863                     IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
864                         BEGIN -- use subtransaction to isolate unique constraint violations
865                             INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
866                         EXCEPTION WHEN unique_violation THEN END;
867                     END IF;
868                 END LOOP;
869
870                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
871             ELSE
872                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
873             END IF;
874
875             -- Add the new value to the vector
876             attr_vector := attr_vector || attr_vector_tmp;
877         END IF;
878
879         IF attr_def.sorter THEN
880             DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
881             IF norm_attr_value[1] IS NOT NULL THEN
882                 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
883             END IF;
884         END IF;
885
886     END LOOP;
887
888 /* We may need to rewrite the vlist to contain
889    the intersection of new values for requested
890    attrs and old values for ignored attrs. To
891    do this, we take the old attr vlist and
892    subtract any values that are valid for the
893    requested attrs, and then add back the new
894    set of attr values. */
895
896     IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
897         SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
898         SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
899         attr_vector := attr_vector || attr_vector_tmp;
900     END IF;
901
902     -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
903     -- attributes can depend on earlier ones.
904     PERFORM metabib.compile_composite_attr_cache_init();
905     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
906
907         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
908
909             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
910             CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
911
912             IF attr_def.filter THEN
913                 IF attr_vector @@ tmp_val::query_int THEN
914                     attr_vector = attr_vector + intset(ccvm_row.id);
915                     EXIT WHEN NOT attr_def.multi;
916                 END IF;
917             END IF;
918
919             IF attr_def.sorter THEN
920                 IF attr_vector @@ tmp_val THEN
921                     DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
922                     INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
923                 END IF;
924             END IF;
925
926         END LOOP;
927
928     END LOOP;
929
930     IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
931         INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
932             ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
933     END IF;
934
935 END;
936
937 $func$ LANGUAGE PLPGSQL;
938
939 CREATE OR REPLACE FUNCTION authority.propagate_changes
940     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
941 DECLARE
942     queuing_success BOOL := FALSE;
943 BEGIN
944
945     PERFORM 1 FROM config.global_flag
946         WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
947             AND enabled;
948
949     IF FOUND THEN
950         -- XXX enqueue special 'propagate' bib action
951         SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), NULL, 'propagate', aid::TEXT) INTO queuing_success;
952
953         IF queuing_success THEN
954             RETURN aid;
955         END IF;
956     END IF;
957
958     PERFORM authority.apply_propagate_changes(aid, bid);
959     RETURN aid;
960 END;
961 $func$ LANGUAGE PLPGSQL;
962
963 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
964     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
965 DECLARE
966     bib_forced  BOOL := FALSE;
967     bib_rec     biblio.record_entry%ROWTYPE;
968     new_marc    TEXT;
969 BEGIN
970
971     SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
972
973     new_marc := vandelay.merge_record_xml(
974         bib_rec.marc, authority.generate_overlay_template(aid));
975
976     IF new_marc = bib_rec.marc THEN
977         -- Authority record change had no impact on this bib record.
978         -- Nothing left to do.
979         RETURN aid;
980     END IF;
981
982     PERFORM 1 FROM config.global_flag
983         WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
984             AND enabled;
985
986     IF NOT FOUND THEN
987         -- update the bib record editor and edit_date
988         bib_rec.editor := (
989             SELECT editor FROM authority.record_entry WHERE id = aid);
990         bib_rec.edit_date = NOW();
991     END IF;
992
993     PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
994
995     UPDATE biblio.record_entry SET
996         marc = new_marc,
997         editor = bib_rec.editor,
998         edit_date = bib_rec.edit_date
999     WHERE id = bid;
1000
1001     PERFORM action.clear_queued_ingest_force();
1002
1003     RETURN aid;
1004
1005 END;
1006 $func$ LANGUAGE PLPGSQL;
1007
1008 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
1009     bib_id BIGINT,
1010     skip_facet BOOL DEFAULT FALSE,
1011     skip_display BOOL DEFAULT FALSE,
1012     skip_browse BOOL DEFAULT FALSE,
1013     skip_search BOOL DEFAULT FALSE,
1014     only_fields INT[] DEFAULT '{}'::INT[]
1015 ) RETURNS VOID AS $func$
1016 DECLARE
1017     fclass          RECORD;
1018     ind_data        metabib.field_entry_template%ROWTYPE;
1019     mbe_row         metabib.browse_entry%ROWTYPE;
1020     mbe_id          BIGINT;
1021     b_skip_facet    BOOL;
1022     b_skip_display    BOOL;
1023     b_skip_browse   BOOL;
1024     b_skip_search   BOOL;
1025     value_prepped   TEXT;
1026     field_list      INT[] := only_fields;
1027     field_types     TEXT[] := '{}'::TEXT[];
1028 BEGIN
1029
1030     IF field_list = '{}'::INT[] THEN
1031         SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
1032     END IF;
1033
1034     SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
1035     SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
1036     SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
1037     SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
1038
1039     IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
1040     IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
1041     IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
1042     IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
1043
1044     PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
1045     IF NOT FOUND THEN
1046         IF NOT b_skip_search THEN
1047             FOR fclass IN SELECT * FROM config.metabib_class LOOP
1048                 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
1049             END LOOP;
1050         END IF;
1051         IF NOT b_skip_facet THEN
1052             DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
1053         END IF;
1054         IF NOT b_skip_display THEN
1055             DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
1056         END IF;
1057         IF NOT b_skip_browse THEN
1058             DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
1059         END IF;
1060     END IF;
1061
1062     FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1063
1064         -- don't store what has been normalized away
1065         CONTINUE WHEN ind_data.value IS NULL;
1066
1067         IF ind_data.field < 0 THEN
1068             ind_data.field = -1 * ind_data.field;
1069         END IF;
1070
1071         IF ind_data.facet_field AND NOT b_skip_facet THEN
1072             INSERT INTO metabib.facet_entry (field, source, value)
1073                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1074         END IF;
1075
1076         IF ind_data.display_field AND NOT b_skip_display THEN
1077             INSERT INTO metabib.display_entry (field, source, value)
1078                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1079         END IF;
1080
1081
1082         IF ind_data.browse_field AND NOT b_skip_browse THEN
1083             -- A caveat about this SELECT: this should take care of replacing
1084             -- old mbe rows when data changes, but not if normalization (by
1085             -- which I mean specifically the output of
1086             -- evergreen.oils_tsearch2()) changes.  It may or may not be
1087             -- expensive to add a comparison of index_vector to index_vector
1088             -- to the WHERE clause below.
1089
1090             CONTINUE WHEN ind_data.sort_value IS NULL;
1091
1092             value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1093             IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1094                 SELECT INTO mbe_row * FROM metabib.browse_entry
1095                     WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1096                     ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1097             END IF;
1098
1099             IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1100                 mbe_id := mbe_row.id;
1101             ELSE -- otherwise, an UPSERT-protected variant
1102                 INSERT INTO metabib.browse_entry
1103                     ( value, sort_value ) VALUES
1104                     ( value_prepped, ind_data.sort_value )
1105                   ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1106                   RETURNING id INTO mbe_id;
1107             END IF;
1108
1109             INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1110                 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1111         END IF;
1112
1113         IF ind_data.search_field AND NOT b_skip_search THEN
1114             -- Avoid inserting duplicate rows
1115             EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1116                 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1117                 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1118                 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1119             IF mbe_id IS NULL THEN
1120                 EXECUTE $$
1121                 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1122                     VALUES ($$ ||
1123                         quote_literal(ind_data.field) || $$, $$ ||
1124                         quote_literal(ind_data.source) || $$, $$ ||
1125                         quote_literal(ind_data.value) ||
1126                     $$);$$;
1127             END IF;
1128         END IF;
1129
1130     END LOOP;
1131
1132     IF NOT b_skip_search THEN
1133         PERFORM metabib.update_combined_index_vectors(bib_id);
1134         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1135         IF NOT FOUND THEN
1136             PERFORM search.symspell_dictionary_reify();
1137         END IF;
1138     END IF;
1139
1140     RETURN;
1141 END;
1142 $func$ LANGUAGE PLPGSQL;
1143
1144 -- get rid of old version
1145 DROP FUNCTION authority.indexing_ingest_or_delete;
1146
1147 COMMIT;
1148