LP#1979071: (follow-up) fix various DB schema and update issues
[evergreen-equinox.git] / Open-ILS / src / sql / Pg / upgrade / XXXX.schema.queued_ingest.sql
1 BEGIN;
2
3 INSERT INTO config.global_flag (name, enabled, label) VALUES (
4     'ingest.queued.max_threads',  TRUE,
5     oils_i18n_gettext(
6         'ingest.queued.max_threads',
7         'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
8         'cgf',
9         'label'
10     )),(
11     'ingest.queued.abort_on_error',  FALSE,
12     oils_i18n_gettext(
13         'ingest.queued.abort_on_error',
14         'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
15         'cgf',
16         'label'
17     )),(
18     'ingest.queued.authority.propagate',  FALSE,
19     oils_i18n_gettext(
20         'ingest.queued.authority.propagate',
21         'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
22         'cgf',
23         'label'
24     )),(
25     'ingest.queued.all',  FALSE,
26     oils_i18n_gettext(
27         'ingest.queued.all',
28         'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
29         'cgf',
30         'label'
31     )),(
32     'ingest.queued.biblio.all',  FALSE,
33     oils_i18n_gettext(
34         'ingest.queued.biblio.all',
35         'Queued Ingest: Use Queued Ingest for all bib record ingest',
36         'cgf',
37         'label'
38     )),(
39     'ingest.queued.authority.all',  FALSE,
40     oils_i18n_gettext(
41         'ingest.queued.authority.all',
42         'Queued Ingest: Use Queued Ingest for all authority record ingest',
43         'cgf',
44         'label'
45     )),(
46     'ingest.queued.biblio.insert.marc_edit_inline',  TRUE,
47     oils_i18n_gettext(
48         'ingest.queued.biblio.insert.marc_edit_inline',
49         'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
50         'cgf',
51         'label'
52     )),(
53     'ingest.queued.biblio.insert',  FALSE,
54     oils_i18n_gettext(
55         'ingest.queued.biblio.insert',
56         'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
57         'cgf',
58         'label'
59     )),(
60     'ingest.queued.authority.insert',  FALSE,
61     oils_i18n_gettext(
62         'ingest.queued.authority.insert',
63         'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
64         'cgf',
65         'label'
66     )),(
67     'ingest.queued.biblio.update.marc_edit_inline',  TRUE,
68     oils_i18n_gettext(
69         'ingest.queued.biblio.update.marc_edit_inline',
70         'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
71         'cgf',
72         'label'
73     )),(
74     'ingest.queued.biblio.update',  FALSE,
75     oils_i18n_gettext(
76         'ingest.queued.biblio.update',
77         'Queued Ingest: Use Queued Ingest for bib record ingest on update',
78         'cgf',
79         'label'
80     )),(
81     'ingest.queued.authority.update',  FALSE,
82     oils_i18n_gettext(
83         'ingest.queued.authority.update',
84         'Queued Ingest: Use Queued Ingest for authority record ingest on update',
85         'cgf',
86         'label'
87     )),(
88     'ingest.queued.biblio.delete',  FALSE,
89     oils_i18n_gettext(
90         'ingest.queued.biblio.delete',
91         'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
92         'cgf',
93         'label'
94     )),(
95     'ingest.queued.authority.delete',  FALSE,
96     oils_i18n_gettext(
97         'ingest.queued.authority.delete',
98         'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
99         'cgf',
100         'label'
101     )
102 );
103
104 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
105
106 CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
107 DECLARE
108     search_class    TEXT;
109     new_value       TEXT := NULL;
110     old_value       TEXT := NULL;
111     _atag           INTEGER;
112 BEGIN
113
114     IF TG_TABLE_SCHEMA = 'authority' THEN
115         IF TG_OP IN ('INSERT', 'UPDATE') THEN
116             _atag = NEW.atag;
117         ELSE
118             _atag = OLD.atag;
119         END IF;
120
121         SELECT  m.field_class INTO search_class
122           FROM  authority.control_set_auth_field_metabib_field_map_refs a
123                 JOIN config.metabib_field m ON (a.metabib_field=m.id)
124           WHERE a.authority_field = _atag;
125
126         IF NOT FOUND THEN
127             RETURN NULL;
128         END IF;
129     ELSE
130         search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1));
131     END IF;
132
133     IF TG_OP IN ('INSERT', 'UPDATE') THEN
134         new_value := NEW.value;
135     END IF;
136
137     IF TG_OP IN ('DELETE', 'UPDATE') THEN
138         old_value := OLD.value;
139     END IF;
140
141     IF new_value = old_value THEN
142         -- same, move along
143     ELSE
144         INSERT INTO search.symspell_dictionary_updates
145             SELECT  txid_current(), *
146               FROM  search.symspell_build_entries(
147                         new_value,
148                         search_class,
149                         old_value
150                     );
151     END IF;
152
153     -- PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value);
154
155     RETURN NULL; -- always fired AFTER
156 END;
157 $f$ LANGUAGE PLPGSQL;
158
159 CREATE TABLE action.ingest_queue (
160     id          SERIAL      PRIMARY KEY,
161     created     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
162     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
163     who         INT         REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
164     start_time  TIMESTAMPTZ,
165     end_time    TIMESTAMPTZ,
166     threads     INT,
167     why         TEXT
168 );
169
170 CREATE TABLE action.ingest_queue_entry (
171     id          BIGSERIAL   PRIMARY KEY,
172     record      BIGINT      NOT NULL, -- points to a record id of the appropriate record_type
173     record_type TEXT        NOT NULL,
174     action      TEXT        NOT NULL,
175     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
176     state_data  TEXT        NOT NULL DEFAULT '',
177     queue       INT         REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
178     override_by BIGINT      REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
179     ingest_time TIMESTAMPTZ,
180     fail_time   TIMESTAMPTZ
181 );
182 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
183 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
184
185 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
186     record_id       BIGINT,
187     rtype           TEXT DEFAULT 'biblio',
188     when_to_run     TIMESTAMPTZ DEFAULT NOW(),
189     queue_id        INT  DEFAULT NULL,
190     ingest_action   TEXT DEFAULT 'update', -- will be the most common?
191     old_state_data  TEXT DEFAULT ''
192 ) RETURNS BOOL AS $F$
193 DECLARE
194     new_entry       action.ingest_queue_entry%ROWTYPE;
195     prev_del_entry  action.ingest_queue_entry%ROWTYPE;
196     diag_detail     TEXT;
197     diag_context    TEXT;
198 BEGIN
199
200     IF ingest_action = 'delete' THEN
201         -- first see if there is an outstanding entry
202         SELECT  * INTO prev_del_entry
203           FROM  action.ingest_queue_entry
204           WHERE qe.record = record_id
205                 AND qe.state_date = old_state_data
206                 AND qe.record_type = rtype
207                 AND qe.ingest_time IS NULL
208                 AND qe.override_by IS NULL;
209     END IF;
210
211     WITH existing_queue_entry_cte AS (
212         SELECT  queue_id AS queue,
213                 rtype AS record_type,
214                 record_id AS record,
215                 qe.id AS override_by,
216                 ingest_action AS action,
217                 q.run_at AS run_at,
218                 old_state_data AS state_data
219           FROM  action.ingest_queue_entry qe
220                 JOIN action.ingest_queue q ON (qe.queue = q.id)
221           WHERE qe.record = record_id
222                 AND q.end_time IS NULL
223                 AND qe.record_type = rtype
224                 AND qe.state_data = old_state_data
225                 AND qe.ingest_time IS NULL
226                 AND qe.fail_time IS NULL
227                 AND qe.override_by IS NULL
228     ), existing_nonqueue_entry_cte AS (
229         SELECT  queue_id AS queue,
230                 rtype AS record_type,
231                 record_id AS record,
232                 qe.id AS override_by,
233                 ingest_action AS action,
234                 qe.run_at AS run_at,
235                 old_state_data AS state_data
236           FROM  action.ingest_queue_entry qe
237           WHERE qe.record = record_id
238                 AND qe.queue IS NULL
239                 AND qe.record_type = rtype
240                 AND qe.state_data = old_state_data
241                 AND qe.ingest_time IS NULL
242                 AND qe.fail_time IS NULL
243                 AND qe.override_by IS NULL
244     ), new_entry_cte AS (
245         SELECT * FROM existing_queue_entry_cte
246           UNION ALL
247         SELECT * FROM existing_nonqueue_entry_cte
248           UNION ALL
249         SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
250     ), insert_entry_cte AS (
251         INSERT INTO action.ingest_queue_entry
252             (queue, record_type, record, override_by, action, run_at, state_data)
253           SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
254             ORDER BY 4 NULLS LAST, 6
255             LIMIT 1
256         RETURNING *
257     ) SELECT * INTO new_entry FROM insert_entry_cte;
258
259     IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
260         UPDATE  action.ingest_queue_entry
261           SET   override_by = new_entry.id
262           WHERE id = prev_del_entry.id;
263
264         UPDATE  action.ingest_queue_entry
265           SET   override_by = NULL
266           WHERE id = new_entry.id;
267
268     ELSIF new_entry.override_by IS NOT NULL THEN
269         RETURN TRUE; -- already handled, don't notify
270     END IF;
271
272     NOTIFY queued_ingest;
273
274     RETURN TRUE;
275 EXCEPTION WHEN OTHERS THEN
276     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
277                             diag_context = PG_EXCEPTION_CONTEXT;
278     RAISE WARNING '%\n%', diag_detail, diag_context;
279     RETURN FALSE;
280 END;
281 $F$ LANGUAGE PLPGSQL;
282
283 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
284 DECLARE
285     ingest_success  BOOL := NULL;
286     qe              action.ingest_queue_entry%ROWTYPE;
287     aid             authority.record_entry.id%TYPE;
288 BEGIN
289
290     SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
291     IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
292         RETURN TRUE; -- Already done
293     END IF;
294
295     IF qe.action = 'delete' THEN
296         IF qe.record_type = 'biblio' THEN
297             SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
298         ELSIF qe.record_type = 'authority' THEN
299             SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
300         END IF;
301     ELSE
302         IF qe.record_type = 'biblio' THEN
303             IF qe.action = 'propagate' THEN
304                 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO aid;
305                 SELECT aid = qe.state_data::BIGINT INTO ingest_success;
306             ELSE
307                 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
308             END IF;
309         ELSIF qe.record_type = 'authority' THEN
310             SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
311         END IF;
312     END IF;
313
314     IF NOT ingest_success THEN
315         UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
316         PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
317         IF FOUND THEN
318             RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
319         ELSE
320             RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
321         END IF;
322     ELSE
323         UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
324     END IF;
325
326     RETURN ingest_success;
327 END;
328 $func$ LANGUAGE PLPGSQL;
329
330 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
331 BEGIN
332     IF NEW.ingest_time IS NOT NULL THEN
333         UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
334     END IF;
335
336     RETURN NULL;
337 END;
338 $F$ LANGUAGE PLPGSQL;
339
340 CREATE TRIGGER complete_duplicated_entries_trigger
341     AFTER UPDATE ON action.ingest_queue_entry
342     FOR EACH ROW WHEN (NEW.override_by IS NULL)
343     EXECUTE PROCEDURE action.complete_duplicated_entries();
344
345 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
346     $_SHARED{"ingest_queue_id"} = $_[0];
347 $$ LANGUAGE plperlu;
348
349 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
350     return $_SHARED{"ingest_queue_id"};
351 $$ LANGUAGE plperlu;
352
353 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
354     delete($_SHARED{"ingest_queue_id"});
355 $$ LANGUAGE plperlu;
356
357 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
358     $_SHARED{"ingest_queue_force"} = $_[0];
359 $$ LANGUAGE plperlu;
360
361 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
362     return $_SHARED{"ingest_queue_force"};
363 $$ LANGUAGE plperlu;
364
365 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
366     delete($_SHARED{"ingest_queue_force"});
367 $$ LANGUAGE plperlu;
368
369 ------------------ ingest functions ------------------
370
371 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
372 DECLARE
373     tmp_bool BOOL;
374     diag_detail     TEXT;
375     diag_context    TEXT;
376 BEGIN
377     PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
378     tmp_bool := FOUND;
379
380     PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
381
382     IF NOT tmp_bool THEN
383         -- One needs to keep these around to support searches
384         -- with the #deleted modifier, so one should turn on the named
385         -- internal flag for that functionality.
386         DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
387     END IF;
388
389     DELETE FROM authority.bib_linking abl WHERE abl.bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
390     DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
391     DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
392
393     RETURN TRUE;
394 EXCEPTION WHEN OTHERS THEN
395     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
396                             diag_context = PG_EXCEPTION_CONTEXT;
397     RAISE WARNING '%\n%', diag_detail, diag_context;
398     RETURN FALSE;
399 END;
400 $func$ LANGUAGE PLPGSQL;
401
402 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
403 DECLARE
404     skip_facet   BOOL   := FALSE;
405     skip_display BOOL   := FALSE;
406     skip_browse  BOOL   := FALSE;
407     skip_search  BOOL   := FALSE;
408     skip_auth    BOOL   := FALSE;
409     skip_full    BOOL   := FALSE;
410     skip_attrs   BOOL   := FALSE;
411     skip_luri    BOOL   := FALSE;
412     skip_mrmap   BOOL   := FALSE;
413     only_attrs   TEXT[] := NULL;
414     only_fields  INT[]  := '{}'::INT[];
415     diag_detail     TEXT;
416     diag_context    TEXT;
417 BEGIN
418
419     -- Record authority linking
420     SELECT extra LIKE '%skip_authority%' INTO skip_auth;
421     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
422     IF NOT FOUND AND NOT skip_auth THEN
423         PERFORM biblio.map_authority_linking( bib.id, bib.marc );
424     END IF;
425
426     -- Flatten and insert the mfr data
427     SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
428     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
429     IF NOT FOUND AND NOT skip_full THEN
430         PERFORM metabib.reingest_metabib_full_rec(bib.id);
431     END IF;
432
433     -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
434     SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
435     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
436     IF NOT FOUND AND NOT skip_attrs THEN
437         IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
438             SELECT REGEXP_SPLIT_TO_ARRAY(
439                 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
440                 '\s*,\s*'
441             ) INTO only_attrs;
442         END IF;
443
444         PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
445     END IF;
446
447     -- Gather and insert the field entry data
448     SELECT extra LIKE '%skip_facet%' INTO skip_facet;
449     SELECT extra LIKE '%skip_display%' INTO skip_display;
450     SELECT extra LIKE '%skip_browse%' INTO skip_browse;
451     SELECT extra LIKE '%skip_search%' INTO skip_search;
452
453     IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
454         SELECT REGEXP_SPLIT_TO_ARRAY(
455             (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
456             '\s*,\s*'
457         )::INT[] INTO only_fields;
458     END IF;
459
460     IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
461         PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
462     END IF;
463
464     -- Located URI magic
465     SELECT extra LIKE '%skip_luri%' INTO skip_luri;
466     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
467     IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
468
469     -- (re)map metarecord-bib linking
470     SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
471     IF insert_only THEN -- if not deleted and performing an insert, check for the flag
472         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
473         IF NOT FOUND AND NOT skip_mrmap THEN
474             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
475         END IF;
476     ELSE -- we're doing an update, and we're not deleted, remap
477         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
478         IF NOT FOUND AND NOT skip_mrmap THEN
479             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
480         END IF;
481     END IF;
482
483     RETURN TRUE;
484 EXCEPTION WHEN OTHERS THEN
485     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
486                             diag_context = PG_EXCEPTION_CONTEXT;
487     RAISE WARNING '%\n%', diag_detail, diag_context;
488     RETURN FALSE;
489 END;
490 $func$ LANGUAGE PLPGSQL;
491
492 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
493 DECLARE
494     tmp_bool BOOL;
495     diag_detail     TEXT;
496     diag_context    TEXT;
497 BEGIN
498     DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
499     DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
500     DELETE FROM authority.simple_heading WHERE record = NEW.id;
501       -- Should remove matching $0 from controlled fields at the same time?
502
503     -- XXX What do we about the actual linking subfields present in
504     -- authority records that target this one when this happens?
505     DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
506
507     RETURN TRUE;
508 EXCEPTION WHEN OTHERS THEN
509     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
510                             diag_context = PG_EXCEPTION_CONTEXT;
511     RAISE WARNING '%\n%', diag_detail, diag_context;
512     RETURN FALSE;
513 END;
514 $func$ LANGUAGE PLPGSQL;
515
516
517 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
518 DECLARE
519     ashs    authority.simple_heading%ROWTYPE;
520     mbe_row metabib.browse_entry%ROWTYPE;
521     mbe_id  BIGINT;
522     ash_id  BIGINT;
523     diag_detail     TEXT;
524     diag_context    TEXT;
525 BEGIN
526
527     -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
528     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
529
530     IF NOT FOUND AND auth.heading <> old_heading THEN
531         PERFORM authority.propagate_changes(auth.id);
532     END IF;
533
534     IF NOT insert_only THEN
535         DELETE FROM authority.authority_linking WHERE source = auth.id;
536         DELETE FROM authority.simple_heading WHERE record = auth.id;
537     END IF;
538
539     INSERT INTO authority.authority_linking (source, target, field)
540         SELECT source, target, field FROM authority.calculate_authority_linking(
541             auth.id, auth.control_set, auth.marc::XML
542         );
543
544     FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
545
546         INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
547             VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
548             ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
549
550         SELECT INTO mbe_row * FROM metabib.browse_entry
551             WHERE value = ashs.value AND sort_value = ashs.sort_value;
552
553         IF FOUND THEN
554             mbe_id := mbe_row.id;
555         ELSE
556             INSERT INTO metabib.browse_entry
557                 ( value, sort_value ) VALUES
558                 ( ashs.value, ashs.sort_value );
559
560             mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
561         END IF;
562
563         INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
564
565     END LOOP;
566
567     -- Flatten and insert the afr data
568     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
569     IF NOT FOUND THEN
570         PERFORM authority.reingest_authority_full_rec(auth.id);
571         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
572         IF NOT FOUND THEN
573             PERFORM authority.reingest_authority_rec_descriptor(auth.id);
574         END IF;
575     END IF;
576
577     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
578     IF NOT FOUND THEN
579         PERFORM search.symspell_dictionary_reify();
580     END IF;
581
582     RETURN TRUE;
583 EXCEPTION WHEN OTHERS THEN
584     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
585                             diag_context = PG_EXCEPTION_CONTEXT;
586     RAISE WARNING '%\n%', diag_detail, diag_context;
587     RETURN FALSE;
588 END;
589 $func$ LANGUAGE PLPGSQL;
590
591 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
592 DECLARE
593     old_state_data      TEXT := '';
594     new_action          TEXT;
595     queuing_force       TEXT;
596     queuing_flag_name   TEXT;
597     queuing_flag        BOOL := FALSE;
598     queuing_success     BOOL := FALSE;
599     ingest_success      BOOL := FALSE;
600     ingest_queue        INT;
601 BEGIN
602
603     -- Identify the ingest action type
604     IF TG_OP = 'UPDATE' THEN
605
606         -- Gather type-specific data for later use
607         IF TG_TABLE_SCHEMA = 'authority' THEN
608             old_state_data = OLD.heading;
609         END IF;
610
611         IF NOT OLD.deleted THEN -- maybe reingest?
612             IF NEW.deleted THEN
613                 new_action = 'delete'; -- nope, delete
614             ELSE
615                 new_action = 'update'; -- yes, update
616             END IF;
617         ELSIF NOT NEW.deleted THEN
618             new_action = 'insert'; -- revivify, AKA insert
619         ELSE
620             RETURN NEW; -- was and is still deleted, don't ingest
621         END IF;
622     ELSIF TG_OP = 'INSERT' THEN
623         new_action = 'insert'; -- brand new
624     ELSE
625         RETURN OLD; -- really deleting the record
626     END IF;
627
628     queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
629     -- See if we should be queuing anything
630     SELECT  enabled INTO queuing_flag
631       FROM  config.internal_flag
632       WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
633             AND enabled
634       LIMIT 1;
635
636     SELECT action.get_queued_ingest_force() INTO queuing_force;
637     IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
638         queuing_flag := TRUE;
639     END IF;
640
641     -- you (or part of authority propagation) can forcibly disable specific queuing actions
642     IF queuing_force = queuing_flag_name||'.disabled' THEN
643         queuing_flag := FALSE;
644     END IF;
645
646     -- And if we should be queuing ...
647     IF queuing_flag THEN
648         ingest_queue := action.get_ingest_queue();
649
650         -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
651         IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
652
653             PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
654
655             --  ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
656             IF NOT FOUND AND OLD.marc = NEW.marc THEN
657                 RETURN NEW;
658             END IF;
659         END IF;
660
661         -- Otherwise, attempt to enqueue
662         SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
663     END IF;
664
665     -- If queuing was not requested, or failed for some reason, do it live.
666     IF NOT queuing_success THEN
667         IF queuing_flag THEN
668             RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
669         END IF;
670
671         IF new_action = 'delete' THEN
672             IF TG_TABLE_SCHEMA = 'biblio' THEN
673                 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
674             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
675                 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
676             END IF;
677         ELSE
678             IF TG_TABLE_SCHEMA = 'biblio' THEN
679                 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
680             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
681                 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
682             END IF;
683         END IF;
684         
685         IF NOT ingest_success THEN
686             PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
687             IF FOUND THEN
688                 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
689             ELSE
690                 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
691             END IF;
692         END IF;
693     END IF;
694
695     RETURN NEW;
696 END;
697 $func$ LANGUAGE PLPGSQL;
698
699 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
700 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
701
702 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
703 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
704
705 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
706 DECLARE
707     transformed_xml TEXT;
708     rmarc           TEXT := prmarc;
709     tmp_val         TEXT;
710     prev_xfrm       TEXT;
711     normalizer      RECORD;
712     xfrm            config.xml_transform%ROWTYPE;
713     attr_vector     INT[] := '{}'::INT[];
714     attr_vector_tmp INT[];
715     attr_list       TEXT[] := pattr_list;
716     attr_value      TEXT[];
717     norm_attr_value TEXT[];
718     tmp_xml         TEXT;
719     tmp_array       TEXT[];
720     attr_def        config.record_attr_definition%ROWTYPE;
721     ccvm_row        config.coded_value_map%ROWTYPE;
722     jump_past       BOOL;
723 BEGIN
724
725     IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
726         SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
727         WHERE (
728             tag IS NOT NULL OR
729             fixed_field IS NOT NULL OR
730             xpath IS NOT NULL OR
731             phys_char_sf IS NOT NULL OR
732             composite
733         ) AND (
734             filter OR sorter
735         );
736     END IF;
737
738     IF rmarc IS NULL THEN
739         SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
740     END IF;
741
742     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
743
744         jump_past := FALSE; -- This gets set when we are non-multi and have found something
745         attr_value := '{}'::TEXT[];
746         norm_attr_value := '{}'::TEXT[];
747         attr_vector_tmp := '{}'::INT[];
748
749         SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
750
751         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
752             SELECT  ARRAY_AGG(value) INTO attr_value
753               FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
754               WHERE record = rid
755                     AND tag LIKE attr_def.tag
756                     AND CASE
757                         WHEN attr_def.sf_list IS NOT NULL
758                             THEN POSITION(subfield IN attr_def.sf_list) > 0
759                         ELSE TRUE
760                     END
761               GROUP BY tag
762               ORDER BY tag;
763
764             IF NOT attr_def.multi THEN
765                 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
766                 jump_past := TRUE;
767             END IF;
768         END IF;
769
770         IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
771             attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
772
773             IF NOT attr_def.multi THEN
774                 attr_value := ARRAY[attr_value[1]];
775                 jump_past := TRUE;
776             END IF;
777         END IF;
778
779         IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
780
781             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
782
783             -- See if we can skip the XSLT ... it's expensive
784             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
785                 -- Can't skip the transform
786                 IF xfrm.xslt <> '---' THEN
787                     transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
788                 ELSE
789                     transformed_xml := rmarc;
790                 END IF;
791
792                 prev_xfrm := xfrm.name;
793             END IF;
794
795             IF xfrm.name IS NULL THEN
796                 -- just grab the marcxml (empty) transform
797                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
798                 prev_xfrm := xfrm.name;
799             END IF;
800
801             FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
802                 tmp_val := oils_xpath_string(
803                                 '//*',
804                                 tmp_xml,
805                                 COALESCE(attr_def.joiner,' '),
806                                 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
807                             );
808                 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
809                     attr_value := attr_value || tmp_val;
810                     EXIT WHEN NOT attr_def.multi;
811                 END IF;
812             END LOOP;
813         END IF;
814
815         IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
816             SELECT  ARRAY_AGG(m.value) INTO tmp_array
817               FROM  vandelay.marc21_physical_characteristics(rmarc) v
818                     LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
819               WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
820                     AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
821
822             attr_value := attr_value || tmp_array;
823
824             IF NOT attr_def.multi THEN
825                 attr_value := ARRAY[attr_value[1]];
826             END IF;
827
828         END IF;
829
830                 -- apply index normalizers to attr_value
831         FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
832             FOR normalizer IN
833                 SELECT  n.func AS func,
834                         n.param_count AS param_count,
835                         m.params AS params
836                   FROM  config.index_normalizer n
837                         JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
838                   WHERE attr = attr_def.name
839                   ORDER BY m.pos LOOP
840                     EXECUTE 'SELECT ' || normalizer.func || '(' ||
841                     COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
842                         CASE
843                             WHEN normalizer.param_count > 0
844                                 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
845                                 ELSE ''
846                             END ||
847                     ')' INTO tmp_val;
848
849             END LOOP;
850             IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
851                 -- note that a string that contains only blanks
852                 -- is a valid value for some attributes
853                 norm_attr_value := norm_attr_value || tmp_val;
854             END IF;
855         END LOOP;
856
857         IF attr_def.filter THEN
858             -- Create unknown uncontrolled values and find the IDs of the values
859             IF ccvm_row.id IS NULL THEN
860                 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
861                     IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
862                         BEGIN -- use subtransaction to isolate unique constraint violations
863                             INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
864                         EXCEPTION WHEN unique_violation THEN END;
865                     END IF;
866                 END LOOP;
867
868                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
869             ELSE
870                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
871             END IF;
872
873             -- Add the new value to the vector
874             attr_vector := attr_vector || attr_vector_tmp;
875         END IF;
876
877         IF attr_def.sorter THEN
878             DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
879             IF norm_attr_value[1] IS NOT NULL THEN
880                 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
881             END IF;
882         END IF;
883
884     END LOOP;
885
886 /* We may need to rewrite the vlist to contain
887    the intersection of new values for requested
888    attrs and old values for ignored attrs. To
889    do this, we take the old attr vlist and
890    subtract any values that are valid for the
891    requested attrs, and then add back the new
892    set of attr values. */
893
894     IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
895         SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
896         SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
897         attr_vector := attr_vector || attr_vector_tmp;
898     END IF;
899
900     -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
901     -- attributes can depend on earlier ones.
902     PERFORM metabib.compile_composite_attr_cache_init();
903     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
904
905         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
906
907             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
908             CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
909
910             IF attr_def.filter THEN
911                 IF attr_vector @@ tmp_val::query_int THEN
912                     attr_vector = attr_vector + intset(ccvm_row.id);
913                     EXIT WHEN NOT attr_def.multi;
914                 END IF;
915             END IF;
916
917             IF attr_def.sorter THEN
918                 IF attr_vector @@ tmp_val THEN
919                     DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
920                     INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
921                 END IF;
922             END IF;
923
924         END LOOP;
925
926     END LOOP;
927
928     IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
929         INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
930             ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
931     END IF;
932
933 END;
934
935 $func$ LANGUAGE PLPGSQL;
936
937 CREATE OR REPLACE FUNCTION authority.propagate_changes
938     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
939 DECLARE
940     queuing_success BOOL := FALSE;
941 BEGIN
942
943     PERFORM 1 FROM config.global_flag
944         WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
945             AND enabled;
946
947     IF FOUND THEN
948         -- XXX enqueue special 'propagate' bib action
949         SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), NULL, 'propagate', aid::TEXT) INTO queuing_success;
950
951         IF queuing_success THEN
952             RETURN aid;
953         END IF;
954     END IF;
955
956     PERFORM authority.apply_propagate_changes(aid, bid);
957     RETURN aid;
958 END;
959 $func$ LANGUAGE PLPGSQL;
960
961 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
962     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
963 DECLARE
964     bib_forced  BOOL := FALSE;
965     bib_rec     biblio.record_entry%ROWTYPE;
966     new_marc    TEXT;
967 BEGIN
968
969     SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
970
971     new_marc := vandelay.merge_record_xml(
972         bib_rec.marc, authority.generate_overlay_template(aid));
973
974     IF new_marc = bib_rec.marc THEN
975         -- Authority record change had no impact on this bib record.
976         -- Nothing left to do.
977         RETURN aid;
978     END IF;
979
980     PERFORM 1 FROM config.global_flag
981         WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
982             AND enabled;
983
984     IF NOT FOUND THEN
985         -- update the bib record editor and edit_date
986         bib_rec.editor := (
987             SELECT editor FROM authority.record_entry WHERE id = aid);
988         bib_rec.edit_date = NOW();
989     END IF;
990
991     PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
992
993     UPDATE biblio.record_entry SET
994         marc = new_marc,
995         editor = bib_rec.editor,
996         edit_date = bib_rec.edit_date
997     WHERE id = bid;
998
999     PERFORM action.clear_queued_ingest_force();
1000
1001     RETURN aid;
1002
1003 END;
1004 $func$ LANGUAGE PLPGSQL;
1005
1006 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
1007     bib_id BIGINT,
1008     skip_facet BOOL DEFAULT FALSE,
1009     skip_display BOOL DEFAULT FALSE,
1010     skip_browse BOOL DEFAULT FALSE,
1011     skip_search BOOL DEFAULT FALSE,
1012     only_fields INT[] DEFAULT '{}'::INT[]
1013 ) RETURNS VOID AS $func$
1014 DECLARE
1015     fclass          RECORD;
1016     ind_data        metabib.field_entry_template%ROWTYPE;
1017     mbe_row         metabib.browse_entry%ROWTYPE;
1018     mbe_id          BIGINT;
1019     b_skip_facet    BOOL;
1020     b_skip_display    BOOL;
1021     b_skip_browse   BOOL;
1022     b_skip_search   BOOL;
1023     value_prepped   TEXT;
1024     field_list      INT[] := only_fields;
1025     field_types     TEXT[] := '{}'::TEXT[];
1026 BEGIN
1027
1028     IF field_list = '{}'::INT[] THEN
1029         SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
1030     END IF;
1031
1032     SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
1033     SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
1034     SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
1035     SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
1036
1037     IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
1038     IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
1039     IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
1040     IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
1041
1042     PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
1043     IF NOT FOUND THEN
1044         IF NOT b_skip_search THEN
1045             FOR fclass IN SELECT * FROM config.metabib_class LOOP
1046                 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
1047             END LOOP;
1048         END IF;
1049         IF NOT b_skip_facet THEN
1050             DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
1051         END IF;
1052         IF NOT b_skip_display THEN
1053             DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
1054         END IF;
1055         IF NOT b_skip_browse THEN
1056             DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
1057         END IF;
1058     END IF;
1059
1060     FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1061
1062         -- don't store what has been normalized away
1063         CONTINUE WHEN ind_data.value IS NULL;
1064
1065         IF ind_data.field < 0 THEN
1066             ind_data.field = -1 * ind_data.field;
1067         END IF;
1068
1069         IF ind_data.facet_field AND NOT b_skip_facet THEN
1070             INSERT INTO metabib.facet_entry (field, source, value)
1071                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1072         END IF;
1073
1074         IF ind_data.display_field AND NOT b_skip_display THEN
1075             INSERT INTO metabib.display_entry (field, source, value)
1076                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1077         END IF;
1078
1079
1080         IF ind_data.browse_field AND NOT b_skip_browse THEN
1081             -- A caveat about this SELECT: this should take care of replacing
1082             -- old mbe rows when data changes, but not if normalization (by
1083             -- which I mean specifically the output of
1084             -- evergreen.oils_tsearch2()) changes.  It may or may not be
1085             -- expensive to add a comparison of index_vector to index_vector
1086             -- to the WHERE clause below.
1087
1088             CONTINUE WHEN ind_data.sort_value IS NULL;
1089
1090             value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1091             IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1092                 SELECT INTO mbe_row * FROM metabib.browse_entry
1093                     WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1094                     ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1095             END IF;
1096
1097             IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1098                 mbe_id := mbe_row.id;
1099             ELSE -- otherwise, an UPSERT-protected variant
1100                 INSERT INTO metabib.browse_entry
1101                     ( value, sort_value ) VALUES
1102                     ( value_prepped, ind_data.sort_value )
1103                   ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1104                   RETURNING id INTO mbe_id;
1105             END IF;
1106
1107             INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1108                 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1109         END IF;
1110
1111         IF ind_data.search_field AND NOT b_skip_search THEN
1112             -- Avoid inserting duplicate rows
1113             EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1114                 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1115                 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1116                 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1117             IF mbe_id IS NULL THEN
1118                 EXECUTE $$
1119                 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1120                     VALUES ($$ ||
1121                         quote_literal(ind_data.field) || $$, $$ ||
1122                         quote_literal(ind_data.source) || $$, $$ ||
1123                         quote_literal(ind_data.value) ||
1124                     $$);$$;
1125             END IF;
1126         END IF;
1127
1128     END LOOP;
1129
1130     IF NOT b_skip_search THEN
1131         PERFORM metabib.update_combined_index_vectors(bib_id);
1132         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1133         IF NOT FOUND THEN
1134             PERFORM search.symspell_dictionary_reify();
1135         END IF;
1136     END IF;
1137
1138     RETURN;
1139 END;
1140 $func$ LANGUAGE PLPGSQL;
1141
1142 -- get rid of old version
1143 DROP FUNCTION authority.indexing_ingest_or_delete;
1144
1145 COMMIT;
1146