LP#1979071: Queued Ingest
[evergreen-equinox.git] / Open-ILS / src / sql / Pg / upgrade / XXXX.schema.queued_ingest.sql
1 BEGIN;
2
3 INSERT INTO config.global_flag (name, enabled, label) VALUES (
4     'ingest.queued.max_threads',  TRUE,
5     oils_i18n_gettext(
6         'ingest.queued.max_threads',
7         'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
8         'cgf',
9         'label'
10     )),(
11     'ingest.queued.abort_on_error',  FALSE,
12     oils_i18n_gettext(
13         'ingest.queued.abort_on_error',
14         'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
15         'cgf',
16         'label'
17     )),(
18     'ingest.queued.authority.propagate',  FALSE,
19     oils_i18n_gettext(
20         'ingest.queued.authority.propagate',
21         'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
22         'cgf',
23         'label'
24     )),(
25     'ingest.queued.all',  FALSE,
26     oils_i18n_gettext(
27         'ingest.queued.all',
28         'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
29         'cgf',
30         'label'
31     )),(
32     'ingest.queued.biblio.all',  FALSE,
33     oils_i18n_gettext(
34         'ingest.queued.biblio.all',
35         'Queued Ingest: Use Queued Ingest for all bib record ingest',
36         'cgf',
37         'label'
38     )),(
39     'ingest.queued.authority.all',  FALSE,
40     oils_i18n_gettext(
41         'ingest.queued.authority.all',
42         'Queued Ingest: Use Queued Ingest for all authority record ingest',
43         'cgf',
44         'label'
45     )),(
46     'ingest.queued.biblio.insert.marc_edit_inline',  TRUE,
47     oils_i18n_gettext(
48         'ingest.queued.biblio.insert.marc_edit_inline',
49         'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
50         'cgf',
51         'label'
52     )),(
53     'ingest.queued.biblio.insert',  FALSE,
54     oils_i18n_gettext(
55         'ingest.queued.biblio.insert',
56         'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
57         'cgf',
58         'label'
59     )),(
60     'ingest.queued.authority.insert',  FALSE,
61     oils_i18n_gettext(
62         'ingest.queued.authority.insert',
63         'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
64         'cgf',
65         'label'
66     )),(
67     'ingest.queued.biblio.update.marc_edit_inline',  TRUE,
68     oils_i18n_gettext(
69         'ingest.queued.biblio.update.marc_edit_inline',
70         'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
71         'cgf',
72         'label'
73     )),(
74     'ingest.queued.biblio.update',  FALSE,
75     oils_i18n_gettext(
76         'ingest.queued.biblio.update',
77         'Queued Ingest: Use Queued Ingest for bib record ingest on update',
78         'cgf',
79         'label'
80     )),(
81     'ingest.queued.authority.update',  FALSE,
82     oils_i18n_gettext(
83         'ingest.queued.authority.update',
84         'Queued Ingest: Use Queued Ingest for authority record ingest on update',
85         'cgf',
86         'label'
87     )),(
88     'ingest.queued.biblio.delete',  FALSE,
89     oils_i18n_gettext(
90         'ingest.queued.biblio.delete',
91         'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
92         'cgf',
93         'label'
94     )),(
95     'ingest.queued.authority.delete',  FALSE,
96     oils_i18n_gettext(
97         'ingest.queued.authority.delete',
98         'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
99         'cgf',
100         'label'
101     )
102 );
103
104 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
105
106 CREATE TABLE action.ingest_queue (
107     id          SERIAL      PRIMARY KEY,
108     created     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
109     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
110     who         INT         REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
111     start_time  TIMESTAMPTZ,
112     end_time    TIMESTAMPTZ,
113     threads     INT,
114     why         TEXT
115 );
116
117 CREATE TABLE action.ingest_queue_entry (
118     id          BIGSERIAL   PRIMARY KEY,
119     record      BIGINT      NOT NULL, -- points to a record id of the appropriate record_type
120     record_type TEXT        NOT NULL,
121     action      TEXT        NOT NULL,
122     run_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
123     state_data  TEXT        NOT NULL DEFAULT '',
124     queue       INT         REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
125     override_by BIGINT      REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
126     ingest_time TIMESTAMPTZ,
127     fail_time   TIMESTAMPTZ
128 );
129 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
130 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
131
132 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
133     record_id       BIGINT,
134     rtype           TEXT DEFAULT 'biblio',
135     when_to_run     TIMESTAMPTZ DEFAULT NOW(),
136     queue_id        INT  DEFAULT NULL,
137     ingest_action   TEXT DEFAULT 'update', -- will be the most common?
138     old_state_data  TEXT DEFAULT ''
139 ) RETURNS BOOL AS $F$
140 DECLARE
141     new_entry       action.ingest_queue_entry%ROWTYPE;
142     prev_del_entry  action.ingest_queue_entry%ROWTYPE;
143     diag_detail     TEXT;
144     diag_context    TEXT;
145 BEGIN
146
147     IF ingest_action = 'delete' THEN
148         -- first see if there is an outstanding entry
149         SELECT  * INTO prev_del_entry
150           FROM  action.ingest_queue_entry
151           WHERE qe.record = record_id
152                 AND qe.state_date = old_state_data
153                 AND qe.record_type = rtype
154                 AND qe.ingest_time IS NULL
155                 AND qe.override_by IS NULL;
156     END IF;
157
158     WITH existing_queue_entry_cte AS (
159         SELECT  queue_id AS queue,
160                 rtype AS record_type,
161                 record_id AS record,
162                 qe.id AS override_by,
163                 ingest_action AS action,
164                 q.run_at AS run_at,
165                 old_state_data AS state_data
166           FROM  action.ingest_queue_entry qe
167                 JOIN action.ingest_queue q ON (qe.queue = q.id)
168           WHERE qe.record = record_id
169                 AND q.end_time IS NULL
170                 AND qe.record_type = rtype
171                 AND qe.state_data = old_state_data
172                 AND qe.ingest_time IS NULL
173                 AND qe.fail_time IS NULL
174                 AND qe.override_by IS NULL
175     ), existing_nonqueue_entry_cte AS (
176         SELECT  queue_id AS queue,
177                 rtype AS record_type,
178                 record_id AS record,
179                 qe.id AS override_by,
180                 ingest_action AS action,
181                 qe.run_at AS run_at,
182                 old_state_data AS state_data
183           FROM  action.ingest_queue_entry qe
184           WHERE qe.record = record_id
185                 AND qe.queue IS NULL
186                 AND qe.record_type = rtype
187                 AND qe.state_data = old_state_data
188                 AND qe.ingest_time IS NULL
189                 AND qe.fail_time IS NULL
190                 AND qe.override_by IS NULL
191     ), new_entry_cte AS (
192         SELECT * FROM existing_queue_entry_cte
193           UNION ALL
194         SELECT * FROM existing_nonqueue_entry_cte
195           UNION ALL
196         SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
197     ), insert_entry_cte AS (
198         INSERT INTO action.ingest_queue_entry
199             (queue, record_type, record, override_by, action, run_at, state_data)
200           SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
201             ORDER BY 4 NULLS LAST, 6
202             LIMIT 1
203         RETURNING *
204     ) SELECT * INTO new_entry FROM insert_entry_cte;
205
206     IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
207         UPDATE  action.ingest_queue_entry
208           SET   override_by = new_entry.id
209           WHERE id = prev_del_entry.id;
210
211         UPDATE  action.ingest_queue_entry
212           SET   override_by = NULL
213           WHERE id = new_entry.id;
214
215     ELSIF new_entry.override_by IS NOT NULL THEN
216         RETURN TRUE; -- already handled, don't notify
217     END IF;
218
219     NOTIFY queued_ingest;
220
221     RETURN TRUE;
222 EXCEPTION WHEN OTHERS THEN
223     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
224                             diag_context = PG_EXCEPTION_CONTEXT;
225     RAISE WARNING '%\n%', diag_detail, diag_context;
226     RETURN FALSE;
227 END;
228 $F$ LANGUAGE PLPGSQL;
229
230 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
231 DECLARE
232     ingest_success  BOOL := NULL;
233     qe              action.ingest_queue_entry%ROWTYPE;
234 BEGIN
235
236     SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
237     IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
238         RETURN TRUE; -- Already done
239     END IF;
240
241     IF qe.action = 'delete' THEN
242         IF qe.record_type = 'biblio' THEN
243             SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
244         ELSIF qe.record_type = 'authority' THEN
245             SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
246         END IF;
247     ELSE
248         IF qe.record_type = 'biblio' THEN
249             IF qe.action = 'propagate' THEN
250                 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success;
251             ELSE
252                 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
253             END IF;
254         ELSIF qe.record_type = 'authority' THEN
255             SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
256         END IF;
257     END IF;
258
259     IF NOT ingest_success THEN
260         UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
261         PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
262         IF FOUND THEN
263             RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
264         ELSE
265             RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
266         END IF;
267     ELSE
268         UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
269     END IF;
270
271     RETURN ingest_success;
272 END;
273 $func$ LANGUAGE PLPGSQL;
274
275
276 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
277 BEGIN
278     IF NEW.ingest_time IS NOT NULL THEN
279         UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
280     END IF;
281
282     RETURN NULL;
283 END;
284 $F$ LANGUAGE PLPGSQL;
285
286 CREATE TRIGGER complete_duplicated_entries_trigger
287     AFTER UPDATE ON action.ingest_queue_entry
288     FOR EACH ROW WHEN (NEW.override_by IS NULL)
289     EXECUTE PROCEDURE action.complete_duplicated_entries();
290
291 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
292     $_SHARED{"ingest_queue_id"} = $_[0];
293 $$ LANGUAGE plperlu;
294
295 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
296     return $_SHARED{"ingest_queue_id"};
297 $$ LANGUAGE plperlu;
298
299 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
300     delete($_SHARED{"ingest_queue_id"});
301 $$ LANGUAGE plperlu;
302
303 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
304     $_SHARED{"ingest_queue_force"} = $_[0];
305 $$ LANGUAGE plperlu;
306
307 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
308     return $_SHARED{"ingest_queue_force"};
309 $$ LANGUAGE plperlu;
310
311 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
312     delete($_SHARED{"ingest_queue_force"});
313 $$ LANGUAGE plperlu;
314
315 ------------------ ingest functions ------------------
316
317 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
318 DECLARE
319     tmp_bool BOOL;
320     diag_detail     TEXT;
321     diag_context    TEXT;
322 BEGIN
323     PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
324     tmp_bool := FOUND;
325
326     PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
327
328     IF NOT tmp_bool THEN
329         -- One needs to keep these around to support searches
330         -- with the #deleted modifier, so one should turn on the named
331         -- internal flag for that functionality.
332         DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
333     END IF;
334
335     DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
336     DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
337     DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
338
339     RETURN TRUE;
340 EXCEPTION WHEN OTHERS THEN
341     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
342                             diag_context = PG_EXCEPTION_CONTEXT;
343     RAISE WARNING '%\n%', diag_detail, diag_context;
344     RETURN FALSE;
345 END;
346 $func$ LANGUAGE PLPGSQL;
347
348 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
349 DECLARE
350     skip_facet   BOOL   := FALSE;
351     skip_display BOOL   := FALSE;
352     skip_browse  BOOL   := FALSE;
353     skip_search  BOOL   := FALSE;
354     skip_auth    BOOL   := FALSE;
355     skip_full    BOOL   := FALSE;
356     skip_attrs   BOOL   := FALSE;
357     skip_luri    BOOL   := FALSE;
358     skip_mrmap   BOOL   := FALSE;
359     only_attrs   TEXT[] := NULL;
360     only_fields  INT[]  := '{}'::INT[];
361     diag_detail     TEXT;
362     diag_context    TEXT;
363 BEGIN
364
365     -- Record authority linking
366     SELECT extra LIKE '%skip_authority%' INTO skip_auth;
367     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
368     IF NOT FOUND AND NOT skip_auth THEN
369         PERFORM biblio.map_authority_linking( bib.id, bib.marc );
370     END IF;
371
372     -- Flatten and insert the mfr data
373     SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
374     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
375     IF NOT FOUND AND NOT skip_full THEN
376         PERFORM metabib.reingest_metabib_full_rec(bib.id);
377     END IF;
378
379     -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
380     SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
381     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
382     IF NOT FOUND AND NOT skip_attrs THEN
383         IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
384             SELECT REGEXP_SPLIT_TO_ARRAY(
385                 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
386                 '\s*,\s*'
387             ) INTO only_attrs;
388         END IF;
389
390         PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
391     END IF;
392
393     -- Gather and insert the field entry data
394     SELECT extra LIKE '%skip_facet%' INTO skip_facet;
395     SELECT extra LIKE '%skip_display%' INTO skip_display;
396     SELECT extra LIKE '%skip_browse%' INTO skip_browse;
397     SELECT extra LIKE '%skip_search%' INTO skip_search;
398
399     IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
400         SELECT REGEXP_SPLIT_TO_ARRAY(
401             (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
402             '\s*,\s*'
403         )::INT[] INTO only_fields;
404     END IF;
405
406     IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
407         PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
408     END IF;
409
410     -- Located URI magic
411     SELECT extra LIKE '%skip_luri%' INTO skip_luri;
412     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
413     IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
414
415     -- (re)map metarecord-bib linking
416     SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
417     IF insert_only THEN -- if not deleted and performing an insert, check for the flag
418         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
419         IF NOT FOUND AND NOT skip_mrmap THEN
420             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
421         END IF;
422     ELSE -- we're doing an update, and we're not deleted, remap
423         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
424         IF NOT FOUND AND NOT skip_mrmap THEN
425             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
426         END IF;
427     END IF;
428
429     RETURN TRUE;
430 EXCEPTION WHEN OTHERS THEN
431     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
432                             diag_context = PG_EXCEPTION_CONTEXT;
433     RAISE WARNING '%\n%', diag_detail, diag_context;
434     RETURN FALSE;
435 END;
436 $func$ LANGUAGE PLPGSQL;
437
438 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
439 DECLARE
440     tmp_bool BOOL;
441     diag_detail     TEXT;
442     diag_context    TEXT;
443 BEGIN
444     DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
445     DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
446     DELETE FROM authority.simple_heading WHERE record = NEW.id;
447       -- Should remove matching $0 from controlled fields at the same time?
448
449     -- XXX What do we about the actual linking subfields present in
450     -- authority records that target this one when this happens?
451     DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
452
453     RETURN TRUE;
454 EXCEPTION WHEN OTHERS THEN
455     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
456                             diag_context = PG_EXCEPTION_CONTEXT;
457     RAISE WARNING '%\n%', diag_detail, diag_context;
458     RETURN FALSE;
459 END;
460 $func$ LANGUAGE PLPGSQL;
461
462
463 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
464 DECLARE
465     ashs    authority.simple_heading%ROWTYPE;
466     mbe_row metabib.browse_entry%ROWTYPE;
467     mbe_id  BIGINT;
468     ash_id  BIGINT;
469     diag_detail     TEXT;
470     diag_context    TEXT;
471 BEGIN
472
473     -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
474     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
475
476     IF NOT FOUND AND auth.heading <> old_heading THEN
477         PERFORM authority.propagate_changes(auth.id);
478     END IF;
479
480     IF NOT insert_only THEN
481         DELETE FROM authority.authority_linking WHERE source = auth.id;
482         DELETE FROM authority.simple_heading WHERE record = auth.id;
483     END IF;
484
485     INSERT INTO authority.authority_linking (source, target, field)
486         SELECT source, target, field FROM authority.calculate_authority_linking(
487             auth.id, auth.control_set, auth.marc::XML
488         );
489
490     FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
491
492         INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
493             VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
494             ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
495
496         SELECT INTO mbe_row * FROM metabib.browse_entry
497             WHERE value = ashs.value AND sort_value = ashs.sort_value;
498
499         IF FOUND THEN
500             mbe_id := mbe_row.id;
501         ELSE
502             INSERT INTO metabib.browse_entry
503                 ( value, sort_value ) VALUES
504                 ( ashs.value, ashs.sort_value );
505
506             mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
507         END IF;
508
509         INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
510
511     END LOOP;
512
513     -- Flatten and insert the afr data
514     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
515     IF NOT FOUND THEN
516         PERFORM authority.reingest_authority_full_rec(auth.id);
517         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
518         IF NOT FOUND THEN
519             PERFORM authority.reingest_authority_rec_descriptor(auth.id);
520         END IF;
521     END IF;
522
523     RETURN TRUE;
524 EXCEPTION WHEN OTHERS THEN
525     GET STACKED DIAGNOSTICS diag_detail  = PG_EXCEPTION_DETAIL,
526                             diag_context = PG_EXCEPTION_CONTEXT;
527     RAISE WARNING '%\n%', diag_detail, diag_context;
528     RETURN FALSE;
529 END;
530 $func$ LANGUAGE PLPGSQL;
531
532 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
533 DECLARE
534     old_state_data      TEXT := '';
535     new_action          TEXT;
536     queuing_force       TEXT;
537     queuing_flag_name   TEXT;
538     queuing_flag        BOOL := FALSE;
539     queuing_success     BOOL := FALSE;
540     ingest_success      BOOL := FALSE;
541     ingest_queue        INT;
542 BEGIN
543
544     -- Identify the ingest action type
545     IF TG_OP = 'UPDATE' THEN
546
547         -- Gather type-specific data for later use
548         IF TG_TABLE_SCHEMA = 'authority' THEN
549             old_state_data = OLD.heading;
550         END IF;
551
552         IF NOT OLD.deleted THEN -- maybe reingest?
553             IF NEW.deleted THEN
554                 new_action = 'delete'; -- nope, delete
555             ELSE
556                 new_action = 'update'; -- yes, update
557             END IF;
558         ELSIF NOT NEW.deleted THEN
559             new_action = 'insert'; -- revivify, AKA insert
560         ELSE
561             RETURN NEW; -- was and is still deleted, don't ingest
562         END IF;
563     ELSIF TG_OP = 'INSERT' THEN
564         new_action = 'insert'; -- brand new
565     ELSE
566         RETURN OLD; -- really deleting the record
567     END IF;
568
569     queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
570     -- See if we should be queuing anything
571     SELECT  enabled INTO queuing_flag
572       FROM  config.internal_flag
573       WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
574             AND enabled
575       LIMIT 1;
576
577     SELECT action.get_queued_ingest_force() INTO queuing_force;
578     IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
579         queuing_flag := TRUE;
580     END IF;
581
582     -- you (or part of authority propagation) can forcibly disable specific queuing actions
583     IF queuing_force = queuing_flag_name||'.disabled' THEN
584         queuing_flag := FALSE;
585     END IF;
586
587     -- And if we should be queuing ...
588     IF queuing_flag THEN
589         ingest_queue := action.get_ingest_queue();
590
591         -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
592         IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
593
594             PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
595
596             --  ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
597             IF NOT FOUND AND OLD.marc = NEW.marc THEN
598                 RETURN NEW;
599             END IF;
600         END IF;
601
602         -- Otherwise, attempt to enqueue
603         SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
604     END IF;
605
606     -- If queuing was not requested, or failed for some reason, do it live.
607     IF NOT queuing_success THEN
608         IF queuing_flag THEN
609             RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
610         END IF;
611
612         IF new_action = 'delete' THEN
613             IF TG_TABLE_SCHEMA = 'biblio' THEN
614                 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
615             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
616                 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
617             END IF;
618         ELSE
619             IF TG_TABLE_SCHEMA = 'biblio' THEN
620                 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
621             ELSIF TG_TABLE_SCHEMA = 'authority' THEN
622                 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
623             END IF;
624         END IF;
625         
626         IF NOT ingest_success THEN
627             PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
628             IF FOUND THEN
629                 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
630             ELSE
631                 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
632             END IF;
633         END IF;
634     END IF;
635
636     RETURN NEW;
637 END;
638 $func$ LANGUAGE PLPGSQL;
639
640 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
641 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
642
643 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
644 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
645
646 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
647 DECLARE
648     transformed_xml TEXT;
649     rmarc           TEXT := prmarc;
650     tmp_val         TEXT;
651     prev_xfrm       TEXT;
652     normalizer      RECORD;
653     xfrm            config.xml_transform%ROWTYPE;
654     attr_vector     INT[] := '{}'::INT[];
655     attr_vector_tmp INT[];
656     attr_list       TEXT[] := pattr_list;
657     attr_value      TEXT[];
658     norm_attr_value TEXT[];
659     tmp_xml         TEXT;
660     tmp_array       TEXT[];
661     attr_def        config.record_attr_definition%ROWTYPE;
662     ccvm_row        config.coded_value_map%ROWTYPE;
663     jump_past       BOOL;
664 BEGIN
665
666     IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
667         SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
668         WHERE (
669             tag IS NOT NULL OR
670             fixed_field IS NOT NULL OR
671             xpath IS NOT NULL OR
672             phys_char_sf IS NOT NULL OR
673             composite
674         ) AND (
675             filter OR sorter
676         );
677     END IF;
678
679     IF rmarc IS NULL THEN
680         SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
681     END IF;
682
683     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
684
685         jump_past := FALSE; -- This gets set when we are non-multi and have found something
686         attr_value := '{}'::TEXT[];
687         norm_attr_value := '{}'::TEXT[];
688         attr_vector_tmp := '{}'::INT[];
689
690         SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
691
692         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
693             SELECT  ARRAY_AGG(value) INTO attr_value
694               FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
695               WHERE record = rid
696                     AND tag LIKE attr_def.tag
697                     AND CASE
698                         WHEN attr_def.sf_list IS NOT NULL
699                             THEN POSITION(subfield IN attr_def.sf_list) > 0
700                         ELSE TRUE
701                     END
702               GROUP BY tag
703               ORDER BY tag;
704
705             IF NOT attr_def.multi THEN
706                 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
707                 jump_past := TRUE;
708             END IF;
709         END IF;
710
711         IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
712             attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
713
714             IF NOT attr_def.multi THEN
715                 attr_value := ARRAY[attr_value[1]];
716                 jump_past := TRUE;
717             END IF;
718         END IF;
719
720         IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
721
722             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
723
724             -- See if we can skip the XSLT ... it's expensive
725             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
726                 -- Can't skip the transform
727                 IF xfrm.xslt <> '---' THEN
728                     transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
729                 ELSE
730                     transformed_xml := rmarc;
731                 END IF;
732
733                 prev_xfrm := xfrm.name;
734             END IF;
735
736             IF xfrm.name IS NULL THEN
737                 -- just grab the marcxml (empty) transform
738                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
739                 prev_xfrm := xfrm.name;
740             END IF;
741
742             FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
743                 tmp_val := oils_xpath_string(
744                                 '//*',
745                                 tmp_xml,
746                                 COALESCE(attr_def.joiner,' '),
747                                 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
748                             );
749                 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
750                     attr_value := attr_value || tmp_val;
751                     EXIT WHEN NOT attr_def.multi;
752                 END IF;
753             END LOOP;
754         END IF;
755
756         IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
757             SELECT  ARRAY_AGG(m.value) INTO tmp_array
758               FROM  vandelay.marc21_physical_characteristics(rmarc) v
759                     LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
760               WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
761                     AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
762
763             attr_value := attr_value || tmp_array;
764
765             IF NOT attr_def.multi THEN
766                 attr_value := ARRAY[attr_value[1]];
767             END IF;
768
769         END IF;
770
771                 -- apply index normalizers to attr_value
772         FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
773             FOR normalizer IN
774                 SELECT  n.func AS func,
775                         n.param_count AS param_count,
776                         m.params AS params
777                   FROM  config.index_normalizer n
778                         JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
779                   WHERE attr = attr_def.name
780                   ORDER BY m.pos LOOP
781                     EXECUTE 'SELECT ' || normalizer.func || '(' ||
782                     COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
783                         CASE
784                             WHEN normalizer.param_count > 0
785                                 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
786                                 ELSE ''
787                             END ||
788                     ')' INTO tmp_val;
789
790             END LOOP;
791             IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
792                 -- note that a string that contains only blanks
793                 -- is a valid value for some attributes
794                 norm_attr_value := norm_attr_value || tmp_val;
795             END IF;
796         END LOOP;
797
798         IF attr_def.filter THEN
799             -- Create unknown uncontrolled values and find the IDs of the values
800             IF ccvm_row.id IS NULL THEN
801                 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
802                     IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
803                         BEGIN -- use subtransaction to isolate unique constraint violations
804                             INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
805                         EXCEPTION WHEN unique_violation THEN END;
806                     END IF;
807                 END LOOP;
808
809                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
810             ELSE
811                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
812             END IF;
813
814             -- Add the new value to the vector
815             attr_vector := attr_vector || attr_vector_tmp;
816         END IF;
817
818         IF attr_def.sorter THEN
819             DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
820             IF norm_attr_value[1] IS NOT NULL THEN
821                 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
822             END IF;
823         END IF;
824
825     END LOOP;
826
827 /* We may need to rewrite the vlist to contain
828    the intersection of new values for requested
829    attrs and old values for ignored attrs. To
830    do this, we take the old attr vlist and
831    subtract any values that are valid for the
832    requested attrs, and then add back the new
833    set of attr values. */
834
835     IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
836         SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
837         SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
838         attr_vector := attr_vector || attr_vector_tmp;
839     END IF;
840
841     -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
842     -- attributes can depend on earlier ones.
843     PERFORM metabib.compile_composite_attr_cache_init();
844     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
845
846         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
847
848             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
849             CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
850
851             IF attr_def.filter THEN
852                 IF attr_vector @@ tmp_val::query_int THEN
853                     attr_vector = attr_vector + intset(ccvm_row.id);
854                     EXIT WHEN NOT attr_def.multi;
855                 END IF;
856             END IF;
857
858             IF attr_def.sorter THEN
859                 IF attr_vector @@ tmp_val THEN
860                     DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
861                     INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
862                 END IF;
863             END IF;
864
865         END LOOP;
866
867     END LOOP;
868
869     IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
870         INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
871             ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
872     END IF;
873
874 END;
875
876 $func$ LANGUAGE PLPGSQL;
877
878 CREATE OR REPLACE FUNCTION authority.propagate_changes
879     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
880 DECLARE
881     queuing_success BOOL := FALSE;
882 BEGIN
883
884     PERFORM 1 FROM config.global_flag
885         WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
886             AND enabled;
887
888     IF FOUND THEN
889         -- XXX enqueue special 'propagate' bib action
890         SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success;
891
892         IF queuing_success THEN
893             RETURN aid;
894         END IF;
895     END IF;
896
897     PERFORM authority.apply_propagate_changes(aid, bid);
898     RETURN aid;
899 END;
900 $func$ LANGUAGE PLPGSQL;
901
902 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
903     (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
904 DECLARE
905     bib_forced  BOOL := FALSE;
906     bib_rec     biblio.record_entry%ROWTYPE;
907     new_marc    TEXT;
908 BEGIN
909
910     SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
911
912     new_marc := vandelay.merge_record_xml(
913         bib_rec.marc, authority.generate_overlay_template(aid));
914
915     IF new_marc = bib_rec.marc THEN
916         -- Authority record change had no impact on this bib record.
917         -- Nothing left to do.
918         RETURN aid;
919     END IF;
920
921     PERFORM 1 FROM config.global_flag
922         WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
923             AND enabled;
924
925     IF NOT FOUND THEN
926         -- update the bib record editor and edit_date
927         bib_rec.editor := (
928             SELECT editor FROM authority.record_entry WHERE id = aid);
929         bib_rec.edit_date = NOW();
930     END IF;
931
932     PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
933
934     UPDATE biblio.record_entry SET
935         marc = new_marc,
936         editor = bib_rec.editor,
937         edit_date = bib_rec.edit_date
938     WHERE id = bid;
939
940     PERFORM action.clear_queued_ingest_force();
941
942     RETURN aid;
943
944 END;
945 $func$ LANGUAGE PLPGSQL;
946
947 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
948     bib_id BIGINT,
949     skip_facet BOOL DEFAULT FALSE,
950     skip_display BOOL DEFAULT FALSE,
951     skip_browse BOOL DEFAULT FALSE,
952     skip_search BOOL DEFAULT FALSE,
953     only_fields INT[] DEFAULT '{}'::INT[]
954 ) RETURNS VOID AS $func$
955 DECLARE
956     fclass          RECORD;
957     ind_data        metabib.field_entry_template%ROWTYPE;
958     mbe_row         metabib.browse_entry%ROWTYPE;
959     mbe_id          BIGINT;
960     b_skip_facet    BOOL;
961     b_skip_display    BOOL;
962     b_skip_browse   BOOL;
963     b_skip_search   BOOL;
964     value_prepped   TEXT;
965     field_list      INT[] := only_fields;
966     field_types     TEXT[] := '{}'::TEXT[];
967 BEGIN
968
969     IF field_list = '{}'::INT[] THEN
970         SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
971     END IF;
972
973     SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
974     SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
975     SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
976     SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
977
978     IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
979     IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
980     IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
981     IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
982
983     PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
984     IF NOT FOUND THEN
985         IF NOT b_skip_search THEN
986             FOR fclass IN SELECT * FROM config.metabib_class LOOP
987                 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
988             END LOOP;
989         END IF;
990         IF NOT b_skip_facet THEN
991             DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
992         END IF;
993         IF NOT b_skip_display THEN
994             DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
995         END IF;
996         IF NOT b_skip_browse THEN
997             DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
998         END IF;
999     END IF;
1000
1001     FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1002
1003         -- don't store what has been normalized away
1004         CONTINUE WHEN ind_data.value IS NULL;
1005
1006         IF ind_data.field < 0 THEN
1007             ind_data.field = -1 * ind_data.field;
1008         END IF;
1009
1010         IF ind_data.facet_field AND NOT b_skip_facet THEN
1011             INSERT INTO metabib.facet_entry (field, source, value)
1012                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1013         END IF;
1014
1015         IF ind_data.display_field AND NOT b_skip_display THEN
1016             INSERT INTO metabib.display_entry (field, source, value)
1017                 VALUES (ind_data.field, ind_data.source, ind_data.value);
1018         END IF;
1019
1020
1021         IF ind_data.browse_field AND NOT b_skip_browse THEN
1022             -- A caveat about this SELECT: this should take care of replacing
1023             -- old mbe rows when data changes, but not if normalization (by
1024             -- which I mean specifically the output of
1025             -- evergreen.oils_tsearch2()) changes.  It may or may not be
1026             -- expensive to add a comparison of index_vector to index_vector
1027             -- to the WHERE clause below.
1028
1029             CONTINUE WHEN ind_data.sort_value IS NULL;
1030
1031             value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1032             IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1033                 SELECT INTO mbe_row * FROM metabib.browse_entry
1034                     WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1035                     ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1036             END IF;
1037
1038             IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1039                 mbe_id := mbe_row.id;
1040             ELSE -- otherwise, an UPSERT-protected variant
1041                 INSERT INTO metabib.browse_entry
1042                     ( value, sort_value ) VALUES
1043                     ( value_prepped, ind_data.sort_value )
1044                   ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1045                   RETURNING id INTO mbe_id;
1046             END IF;
1047
1048             INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1049                 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1050         END IF;
1051
1052         IF ind_data.search_field AND NOT b_skip_search THEN
1053             -- Avoid inserting duplicate rows
1054             EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1055                 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1056                 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1057                 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1058             IF mbe_id IS NULL THEN
1059                 EXECUTE $$
1060                 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1061                     VALUES ($$ ||
1062                         quote_literal(ind_data.field) || $$, $$ ||
1063                         quote_literal(ind_data.source) || $$, $$ ||
1064                         quote_literal(ind_data.value) ||
1065                     $$);$$;
1066             END IF;
1067         END IF;
1068
1069     END LOOP;
1070
1071     IF NOT b_skip_search THEN
1072         PERFORM metabib.update_combined_index_vectors(bib_id);
1073         PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1074         IF NOT FOUND THEN
1075             PERFORM search.symspell_dictionary_reify();
1076         END IF;
1077     END IF;
1078
1079     RETURN;
1080 END;
1081 $func$ LANGUAGE PLPGSQL;
1082
1083 COMMIT;
1084