3 INSERT INTO config.global_flag (name, enabled, label) VALUES (
4 'ingest.queued.max_threads', TRUE,
6 'ingest.queued.max_threads',
7 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
11 'ingest.queued.abort_on_error', FALSE,
13 'ingest.queued.abort_on_error',
14 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
18 'ingest.queued.authority.propagate', FALSE,
20 'ingest.queued.authority.propagate',
21 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
25 'ingest.queued.all', FALSE,
28 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
32 'ingest.queued.biblio.all', FALSE,
34 'ingest.queued.biblio.all',
35 'Queued Ingest: Use Queued Ingest for all bib record ingest',
39 'ingest.queued.authority.all', FALSE,
41 'ingest.queued.authority.all',
42 'Queued Ingest: Use Queued Ingest for all authority record ingest',
46 'ingest.queued.biblio.insert.marc_edit_inline', TRUE,
48 'ingest.queued.biblio.insert.marc_edit_inline',
49 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
53 'ingest.queued.biblio.insert', FALSE,
55 'ingest.queued.biblio.insert',
56 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
60 'ingest.queued.authority.insert', FALSE,
62 'ingest.queued.authority.insert',
63 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
67 'ingest.queued.biblio.update.marc_edit_inline', TRUE,
69 'ingest.queued.biblio.update.marc_edit_inline',
70 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
74 'ingest.queued.biblio.update', FALSE,
76 'ingest.queued.biblio.update',
77 'Queued Ingest: Use Queued Ingest for bib record ingest on update',
81 'ingest.queued.authority.update', FALSE,
83 'ingest.queued.authority.update',
84 'Queued Ingest: Use Queued Ingest for authority record ingest on update',
88 'ingest.queued.biblio.delete', FALSE,
90 'ingest.queued.biblio.delete',
91 'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
95 'ingest.queued.authority.delete', FALSE,
97 'ingest.queued.authority.delete',
98 'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
104 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
106 CREATE TABLE action.ingest_queue (
107 id SERIAL PRIMARY KEY,
108 created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
109 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
110 who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
111 start_time TIMESTAMPTZ,
112 end_time TIMESTAMPTZ,
117 CREATE TABLE action.ingest_queue_entry (
118 id BIGSERIAL PRIMARY KEY,
119 record BIGINT NOT NULL, -- points to a record id of the appropriate record_type
120 record_type TEXT NOT NULL,
121 action TEXT NOT NULL,
122 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
123 state_data TEXT NOT NULL DEFAULT '',
124 queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
125 override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
126 ingest_time TIMESTAMPTZ,
127 fail_time TIMESTAMPTZ
129 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
130 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
132 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
134 rtype TEXT DEFAULT 'biblio',
135 when_to_run TIMESTAMPTZ DEFAULT NOW(),
136 queue_id INT DEFAULT NULL,
137 ingest_action TEXT DEFAULT 'update', -- will be the most common?
138 old_state_data TEXT DEFAULT ''
139 ) RETURNS BOOL AS $F$
141 new_entry action.ingest_queue_entry%ROWTYPE;
142 prev_del_entry action.ingest_queue_entry%ROWTYPE;
147 IF ingest_action = 'delete' THEN
148 -- first see if there is an outstanding entry
149 SELECT * INTO prev_del_entry
150 FROM action.ingest_queue_entry
151 WHERE qe.record = record_id
152 AND qe.state_date = old_state_data
153 AND qe.record_type = rtype
154 AND qe.ingest_time IS NULL
155 AND qe.override_by IS NULL;
158 WITH existing_queue_entry_cte AS (
159 SELECT queue_id AS queue,
160 rtype AS record_type,
162 qe.id AS override_by,
163 ingest_action AS action,
165 old_state_data AS state_data
166 FROM action.ingest_queue_entry qe
167 JOIN action.ingest_queue q ON (qe.queue = q.id)
168 WHERE qe.record = record_id
169 AND q.end_time IS NULL
170 AND qe.record_type = rtype
171 AND qe.state_data = old_state_data
172 AND qe.ingest_time IS NULL
173 AND qe.fail_time IS NULL
174 AND qe.override_by IS NULL
175 ), existing_nonqueue_entry_cte AS (
176 SELECT queue_id AS queue,
177 rtype AS record_type,
179 qe.id AS override_by,
180 ingest_action AS action,
182 old_state_data AS state_data
183 FROM action.ingest_queue_entry qe
184 WHERE qe.record = record_id
186 AND qe.record_type = rtype
187 AND qe.state_data = old_state_data
188 AND qe.ingest_time IS NULL
189 AND qe.fail_time IS NULL
190 AND qe.override_by IS NULL
191 ), new_entry_cte AS (
192 SELECT * FROM existing_queue_entry_cte
194 SELECT * FROM existing_nonqueue_entry_cte
196 SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
197 ), insert_entry_cte AS (
198 INSERT INTO action.ingest_queue_entry
199 (queue, record_type, record, override_by, action, run_at, state_data)
200 SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
201 ORDER BY 4 NULLS LAST, 6
204 ) SELECT * INTO new_entry FROM insert_entry_cte;
206 IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
207 UPDATE action.ingest_queue_entry
208 SET override_by = new_entry.id
209 WHERE id = prev_del_entry.id;
211 UPDATE action.ingest_queue_entry
212 SET override_by = NULL
213 WHERE id = new_entry.id;
215 ELSIF new_entry.override_by IS NOT NULL THEN
216 RETURN TRUE; -- already handled, don't notify
219 NOTIFY queued_ingest;
222 EXCEPTION WHEN OTHERS THEN
223 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
224 diag_context = PG_EXCEPTION_CONTEXT;
225 RAISE WARNING '%\n%', diag_detail, diag_context;
228 $F$ LANGUAGE PLPGSQL;
230 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
232 ingest_success BOOL := NULL;
233 qe action.ingest_queue_entry%ROWTYPE;
236 SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
237 IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
238 RETURN TRUE; -- Already done
241 IF qe.action = 'delete' THEN
242 IF qe.record_type = 'biblio' THEN
243 SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
244 ELSIF qe.record_type = 'authority' THEN
245 SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
248 IF qe.record_type = 'biblio' THEN
249 IF qe.action = 'propagate' THEN
250 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success;
252 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
254 ELSIF qe.record_type = 'authority' THEN
255 SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
259 IF NOT ingest_success THEN
260 UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
261 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
263 RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
265 RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
268 UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
271 RETURN ingest_success;
273 $func$ LANGUAGE PLPGSQL;
276 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
278 IF NEW.ingest_time IS NOT NULL THEN
279 UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
284 $F$ LANGUAGE PLPGSQL;
286 CREATE TRIGGER complete_duplicated_entries_trigger
287 AFTER UPDATE ON action.ingest_queue_entry
288 FOR EACH ROW WHEN (NEW.override_by IS NULL)
289 EXECUTE PROCEDURE action.complete_duplicated_entries();
291 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
292 $_SHARED{"ingest_queue_id"} = $_[0];
295 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
296 return $_SHARED{"ingest_queue_id"};
299 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
300 delete($_SHARED{"ingest_queue_id"});
303 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
304 $_SHARED{"ingest_queue_force"} = $_[0];
307 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
308 return $_SHARED{"ingest_queue_force"};
311 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
312 delete($_SHARED{"ingest_queue_force"});
315 ------------------ ingest functions ------------------
317 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
323 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
326 PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
329 -- One needs to keep these around to support searches
330 -- with the #deleted modifier, so one should turn on the named
331 -- internal flag for that functionality.
332 DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
335 DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
336 DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
337 DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
340 EXCEPTION WHEN OTHERS THEN
341 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
342 diag_context = PG_EXCEPTION_CONTEXT;
343 RAISE WARNING '%\n%', diag_detail, diag_context;
346 $func$ LANGUAGE PLPGSQL;
348 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
350 skip_facet BOOL := FALSE;
351 skip_display BOOL := FALSE;
352 skip_browse BOOL := FALSE;
353 skip_search BOOL := FALSE;
354 skip_auth BOOL := FALSE;
355 skip_full BOOL := FALSE;
356 skip_attrs BOOL := FALSE;
357 skip_luri BOOL := FALSE;
358 skip_mrmap BOOL := FALSE;
359 only_attrs TEXT[] := NULL;
360 only_fields INT[] := '{}'::INT[];
365 -- Record authority linking
366 SELECT extra LIKE '%skip_authority%' INTO skip_auth;
367 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
368 IF NOT FOUND AND NOT skip_auth THEN
369 PERFORM biblio.map_authority_linking( bib.id, bib.marc );
372 -- Flatten and insert the mfr data
373 SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
374 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
375 IF NOT FOUND AND NOT skip_full THEN
376 PERFORM metabib.reingest_metabib_full_rec(bib.id);
379 -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
380 SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
381 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
382 IF NOT FOUND AND NOT skip_attrs THEN
383 IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
384 SELECT REGEXP_SPLIT_TO_ARRAY(
385 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
390 PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
393 -- Gather and insert the field entry data
394 SELECT extra LIKE '%skip_facet%' INTO skip_facet;
395 SELECT extra LIKE '%skip_display%' INTO skip_display;
396 SELECT extra LIKE '%skip_browse%' INTO skip_browse;
397 SELECT extra LIKE '%skip_search%' INTO skip_search;
399 IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
400 SELECT REGEXP_SPLIT_TO_ARRAY(
401 (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
403 )::INT[] INTO only_fields;
406 IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
407 PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
411 SELECT extra LIKE '%skip_luri%' INTO skip_luri;
412 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
413 IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
415 -- (re)map metarecord-bib linking
416 SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
417 IF insert_only THEN -- if not deleted and performing an insert, check for the flag
418 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
419 IF NOT FOUND AND NOT skip_mrmap THEN
420 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
422 ELSE -- we're doing an update, and we're not deleted, remap
423 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
424 IF NOT FOUND AND NOT skip_mrmap THEN
425 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
430 EXCEPTION WHEN OTHERS THEN
431 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
432 diag_context = PG_EXCEPTION_CONTEXT;
433 RAISE WARNING '%\n%', diag_detail, diag_context;
436 $func$ LANGUAGE PLPGSQL;
438 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
444 DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
445 DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
446 DELETE FROM authority.simple_heading WHERE record = NEW.id;
447 -- Should remove matching $0 from controlled fields at the same time?
449 -- XXX What do we about the actual linking subfields present in
450 -- authority records that target this one when this happens?
451 DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
454 EXCEPTION WHEN OTHERS THEN
455 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
456 diag_context = PG_EXCEPTION_CONTEXT;
457 RAISE WARNING '%\n%', diag_detail, diag_context;
460 $func$ LANGUAGE PLPGSQL;
463 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
465 ashs authority.simple_heading%ROWTYPE;
466 mbe_row metabib.browse_entry%ROWTYPE;
473 -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
474 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
476 IF NOT FOUND AND auth.heading <> old_heading THEN
477 PERFORM authority.propagate_changes(auth.id);
480 IF NOT insert_only THEN
481 DELETE FROM authority.authority_linking WHERE source = auth.id;
482 DELETE FROM authority.simple_heading WHERE record = auth.id;
485 INSERT INTO authority.authority_linking (source, target, field)
486 SELECT source, target, field FROM authority.calculate_authority_linking(
487 auth.id, auth.control_set, auth.marc::XML
490 FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
492 INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
493 VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
494 ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
496 SELECT INTO mbe_row * FROM metabib.browse_entry
497 WHERE value = ashs.value AND sort_value = ashs.sort_value;
500 mbe_id := mbe_row.id;
502 INSERT INTO metabib.browse_entry
503 ( value, sort_value ) VALUES
504 ( ashs.value, ashs.sort_value );
506 mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
509 INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
513 -- Flatten and insert the afr data
514 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
516 PERFORM authority.reingest_authority_full_rec(auth.id);
517 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
519 PERFORM authority.reingest_authority_rec_descriptor(auth.id);
524 EXCEPTION WHEN OTHERS THEN
525 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
526 diag_context = PG_EXCEPTION_CONTEXT;
527 RAISE WARNING '%\n%', diag_detail, diag_context;
530 $func$ LANGUAGE PLPGSQL;
532 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
534 old_state_data TEXT := '';
537 queuing_flag_name TEXT;
538 queuing_flag BOOL := FALSE;
539 queuing_success BOOL := FALSE;
540 ingest_success BOOL := FALSE;
544 -- Identify the ingest action type
545 IF TG_OP = 'UPDATE' THEN
547 -- Gather type-specific data for later use
548 IF TG_TABLE_SCHEMA = 'authority' THEN
549 old_state_data = OLD.heading;
552 IF NOT OLD.deleted THEN -- maybe reingest?
554 new_action = 'delete'; -- nope, delete
556 new_action = 'update'; -- yes, update
558 ELSIF NOT NEW.deleted THEN
559 new_action = 'insert'; -- revivify, AKA insert
561 RETURN NEW; -- was and is still deleted, don't ingest
563 ELSIF TG_OP = 'INSERT' THEN
564 new_action = 'insert'; -- brand new
566 RETURN OLD; -- really deleting the record
569 queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
570 -- See if we should be queuing anything
571 SELECT enabled INTO queuing_flag
572 FROM config.internal_flag
573 WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
577 SELECT action.get_queued_ingest_force() INTO queuing_force;
578 IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
579 queuing_flag := TRUE;
582 -- you (or part of authority propagation) can forcibly disable specific queuing actions
583 IF queuing_force = queuing_flag_name||'.disabled' THEN
584 queuing_flag := FALSE;
587 -- And if we should be queuing ...
589 ingest_queue := action.get_ingest_queue();
591 -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
592 IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
594 PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
596 -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
597 IF NOT FOUND AND OLD.marc = NEW.marc THEN
602 -- Otherwise, attempt to enqueue
603 SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
606 -- If queuing was not requested, or failed for some reason, do it live.
607 IF NOT queuing_success THEN
609 RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
612 IF new_action = 'delete' THEN
613 IF TG_TABLE_SCHEMA = 'biblio' THEN
614 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
615 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
616 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
619 IF TG_TABLE_SCHEMA = 'biblio' THEN
620 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
621 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
622 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
626 IF NOT ingest_success THEN
627 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
629 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
631 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
638 $func$ LANGUAGE PLPGSQL;
640 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
641 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
643 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
644 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
646 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
648 transformed_xml TEXT;
649 rmarc TEXT := prmarc;
653 xfrm config.xml_transform%ROWTYPE;
654 attr_vector INT[] := '{}'::INT[];
655 attr_vector_tmp INT[];
656 attr_list TEXT[] := pattr_list;
658 norm_attr_value TEXT[];
661 attr_def config.record_attr_definition%ROWTYPE;
662 ccvm_row config.coded_value_map%ROWTYPE;
666 IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
667 SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
670 fixed_field IS NOT NULL OR
672 phys_char_sf IS NOT NULL OR
679 IF rmarc IS NULL THEN
680 SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
683 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
685 jump_past := FALSE; -- This gets set when we are non-multi and have found something
686 attr_value := '{}'::TEXT[];
687 norm_attr_value := '{}'::TEXT[];
688 attr_vector_tmp := '{}'::INT[];
690 SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
692 IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
693 SELECT ARRAY_AGG(value) INTO attr_value
694 FROM (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
696 AND tag LIKE attr_def.tag
698 WHEN attr_def.sf_list IS NOT NULL
699 THEN POSITION(subfield IN attr_def.sf_list) > 0
705 IF NOT attr_def.multi THEN
706 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
711 IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
712 attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
714 IF NOT attr_def.multi THEN
715 attr_value := ARRAY[attr_value[1]];
720 IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
722 SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
724 -- See if we can skip the XSLT ... it's expensive
725 IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
726 -- Can't skip the transform
727 IF xfrm.xslt <> '---' THEN
728 transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
730 transformed_xml := rmarc;
733 prev_xfrm := xfrm.name;
736 IF xfrm.name IS NULL THEN
737 -- just grab the marcxml (empty) transform
738 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
739 prev_xfrm := xfrm.name;
742 FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
743 tmp_val := oils_xpath_string(
746 COALESCE(attr_def.joiner,' '),
747 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
749 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
750 attr_value := attr_value || tmp_val;
751 EXIT WHEN NOT attr_def.multi;
756 IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
757 SELECT ARRAY_AGG(m.value) INTO tmp_array
758 FROM vandelay.marc21_physical_characteristics(rmarc) v
759 LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
760 WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
761 AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
763 attr_value := attr_value || tmp_array;
765 IF NOT attr_def.multi THEN
766 attr_value := ARRAY[attr_value[1]];
771 -- apply index normalizers to attr_value
772 FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
774 SELECT n.func AS func,
775 n.param_count AS param_count,
777 FROM config.index_normalizer n
778 JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
779 WHERE attr = attr_def.name
781 EXECUTE 'SELECT ' || normalizer.func || '(' ||
782 COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
784 WHEN normalizer.param_count > 0
785 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
791 IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
792 -- note that a string that contains only blanks
793 -- is a valid value for some attributes
794 norm_attr_value := norm_attr_value || tmp_val;
798 IF attr_def.filter THEN
799 -- Create unknown uncontrolled values and find the IDs of the values
800 IF ccvm_row.id IS NULL THEN
801 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
802 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
803 BEGIN -- use subtransaction to isolate unique constraint violations
804 INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
805 EXCEPTION WHEN unique_violation THEN END;
809 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
811 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
814 -- Add the new value to the vector
815 attr_vector := attr_vector || attr_vector_tmp;
818 IF attr_def.sorter THEN
819 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
820 IF norm_attr_value[1] IS NOT NULL THEN
821 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
827 /* We may need to rewrite the vlist to contain
828 the intersection of new values for requested
829 attrs and old values for ignored attrs. To
830 do this, we take the old attr vlist and
831 subtract any values that are valid for the
832 requested attrs, and then add back the new
833 set of attr values. */
835 IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
836 SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
837 SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
838 attr_vector := attr_vector || attr_vector_tmp;
841 -- On to composite attributes, now that the record attrs have been pulled. Processed in name order, so later composite
842 -- attributes can depend on earlier ones.
843 PERFORM metabib.compile_composite_attr_cache_init();
844 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
846 FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
848 tmp_val := metabib.compile_composite_attr( ccvm_row.id );
849 CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
851 IF attr_def.filter THEN
852 IF attr_vector @@ tmp_val::query_int THEN
853 attr_vector = attr_vector + intset(ccvm_row.id);
854 EXIT WHEN NOT attr_def.multi;
858 IF attr_def.sorter THEN
859 IF attr_vector @@ tmp_val THEN
860 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
861 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
869 IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
870 INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
871 ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
876 $func$ LANGUAGE PLPGSQL;
878 CREATE OR REPLACE FUNCTION authority.propagate_changes
879 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
881 queuing_success BOOL := FALSE;
884 PERFORM 1 FROM config.global_flag
885 WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
889 -- XXX enqueue special 'propagate' bib action
890 SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success;
892 IF queuing_success THEN
897 PERFORM authority.apply_propagate_changes(aid, bid);
900 $func$ LANGUAGE PLPGSQL;
902 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
903 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
905 bib_forced BOOL := FALSE;
906 bib_rec biblio.record_entry%ROWTYPE;
910 SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
912 new_marc := vandelay.merge_record_xml(
913 bib_rec.marc, authority.generate_overlay_template(aid));
915 IF new_marc = bib_rec.marc THEN
916 -- Authority record change had no impact on this bib record.
917 -- Nothing left to do.
921 PERFORM 1 FROM config.global_flag
922 WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
926 -- update the bib record editor and edit_date
928 SELECT editor FROM authority.record_entry WHERE id = aid);
929 bib_rec.edit_date = NOW();
932 PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
934 UPDATE biblio.record_entry SET
936 editor = bib_rec.editor,
937 edit_date = bib_rec.edit_date
940 PERFORM action.clear_queued_ingest_force();
945 $func$ LANGUAGE PLPGSQL;
947 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
949 skip_facet BOOL DEFAULT FALSE,
950 skip_display BOOL DEFAULT FALSE,
951 skip_browse BOOL DEFAULT FALSE,
952 skip_search BOOL DEFAULT FALSE,
953 only_fields INT[] DEFAULT '{}'::INT[]
954 ) RETURNS VOID AS $func$
957 ind_data metabib.field_entry_template%ROWTYPE;
958 mbe_row metabib.browse_entry%ROWTYPE;
965 field_list INT[] := only_fields;
966 field_types TEXT[] := '{}'::TEXT[];
969 IF field_list = '{}'::INT[] THEN
970 SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
973 SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
974 SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
975 SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
976 SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
978 IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
979 IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
980 IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
981 IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
983 PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
985 IF NOT b_skip_search THEN
986 FOR fclass IN SELECT * FROM config.metabib_class LOOP
987 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
990 IF NOT b_skip_facet THEN
991 DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
993 IF NOT b_skip_display THEN
994 DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
996 IF NOT b_skip_browse THEN
997 DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
1001 FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1003 -- don't store what has been normalized away
1004 CONTINUE WHEN ind_data.value IS NULL;
1006 IF ind_data.field < 0 THEN
1007 ind_data.field = -1 * ind_data.field;
1010 IF ind_data.facet_field AND NOT b_skip_facet THEN
1011 INSERT INTO metabib.facet_entry (field, source, value)
1012 VALUES (ind_data.field, ind_data.source, ind_data.value);
1015 IF ind_data.display_field AND NOT b_skip_display THEN
1016 INSERT INTO metabib.display_entry (field, source, value)
1017 VALUES (ind_data.field, ind_data.source, ind_data.value);
1021 IF ind_data.browse_field AND NOT b_skip_browse THEN
1022 -- A caveat about this SELECT: this should take care of replacing
1023 -- old mbe rows when data changes, but not if normalization (by
1024 -- which I mean specifically the output of
1025 -- evergreen.oils_tsearch2()) changes. It may or may not be
1026 -- expensive to add a comparison of index_vector to index_vector
1027 -- to the WHERE clause below.
1029 CONTINUE WHEN ind_data.sort_value IS NULL;
1031 value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1032 IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1033 SELECT INTO mbe_row * FROM metabib.browse_entry
1034 WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1035 ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1038 IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1039 mbe_id := mbe_row.id;
1040 ELSE -- otherwise, an UPSERT-protected variant
1041 INSERT INTO metabib.browse_entry
1042 ( value, sort_value ) VALUES
1043 ( value_prepped, ind_data.sort_value )
1044 ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1045 RETURNING id INTO mbe_id;
1048 INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1049 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1052 IF ind_data.search_field AND NOT b_skip_search THEN
1053 -- Avoid inserting duplicate rows
1054 EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1055 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1056 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1057 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1058 IF mbe_id IS NULL THEN
1060 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1062 quote_literal(ind_data.field) || $$, $$ ||
1063 quote_literal(ind_data.source) || $$, $$ ||
1064 quote_literal(ind_data.value) ||
1071 IF NOT b_skip_search THEN
1072 PERFORM metabib.update_combined_index_vectors(bib_id);
1073 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1075 PERFORM search.symspell_dictionary_reify();
1081 $func$ LANGUAGE PLPGSQL;