3 SELECT evergreen.upgrade_deps_block_check('1369', :eg_version);
5 INSERT INTO config.global_flag (name, enabled, label) VALUES (
6 'ingest.queued.max_threads', TRUE,
8 'ingest.queued.max_threads',
9 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
13 'ingest.queued.abort_on_error', FALSE,
15 'ingest.queued.abort_on_error',
16 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
20 'ingest.queued.authority.propagate', FALSE,
22 'ingest.queued.authority.propagate',
23 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
27 'ingest.queued.all', FALSE,
30 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
34 'ingest.queued.biblio.all', FALSE,
36 'ingest.queued.biblio.all',
37 'Queued Ingest: Use Queued Ingest for all bib record ingest',
41 'ingest.queued.authority.all', FALSE,
43 'ingest.queued.authority.all',
44 'Queued Ingest: Use Queued Ingest for all authority record ingest',
48 'ingest.queued.biblio.insert.marc_edit_inline', TRUE,
50 'ingest.queued.biblio.insert.marc_edit_inline',
51 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
55 'ingest.queued.biblio.insert', FALSE,
57 'ingest.queued.biblio.insert',
58 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
62 'ingest.queued.authority.insert', FALSE,
64 'ingest.queued.authority.insert',
65 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
69 'ingest.queued.biblio.update.marc_edit_inline', TRUE,
71 'ingest.queued.biblio.update.marc_edit_inline',
72 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
76 'ingest.queued.biblio.update', FALSE,
78 'ingest.queued.biblio.update',
79 'Queued Ingest: Use Queued Ingest for bib record ingest on update',
83 'ingest.queued.authority.update', FALSE,
85 'ingest.queued.authority.update',
86 'Queued Ingest: Use Queued Ingest for authority record ingest on update',
90 'ingest.queued.biblio.delete', FALSE,
92 'ingest.queued.biblio.delete',
93 'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
97 'ingest.queued.authority.delete', FALSE,
99 'ingest.queued.authority.delete',
100 'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
106 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
108 CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
111 new_value TEXT := NULL;
112 old_value TEXT := NULL;
116 IF TG_TABLE_SCHEMA = 'authority' THEN
117 IF TG_OP IN ('INSERT', 'UPDATE') THEN
123 SELECT m.field_class INTO search_class
124 FROM authority.control_set_auth_field_metabib_field_map_refs a
125 JOIN config.metabib_field m ON (a.metabib_field=m.id)
126 WHERE a.authority_field = _atag;
132 search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1));
135 IF TG_OP IN ('INSERT', 'UPDATE') THEN
136 new_value := NEW.value;
139 IF TG_OP IN ('DELETE', 'UPDATE') THEN
140 old_value := OLD.value;
143 IF new_value = old_value THEN
146 INSERT INTO search.symspell_dictionary_updates
147 SELECT txid_current(), *
148 FROM search.symspell_build_entries(
155 -- PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value);
157 RETURN NULL; -- always fired AFTER
159 $f$ LANGUAGE PLPGSQL;
161 CREATE TABLE action.ingest_queue (
162 id SERIAL PRIMARY KEY,
163 created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
164 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
165 who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
166 start_time TIMESTAMPTZ,
167 end_time TIMESTAMPTZ,
172 CREATE TABLE action.ingest_queue_entry (
173 id BIGSERIAL PRIMARY KEY,
174 record BIGINT NOT NULL, -- points to a record id of the appropriate record_type
175 record_type TEXT NOT NULL,
176 action TEXT NOT NULL,
177 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
178 state_data TEXT NOT NULL DEFAULT '',
179 queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
180 override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
181 ingest_time TIMESTAMPTZ,
182 fail_time TIMESTAMPTZ
184 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
185 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
187 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
189 rtype TEXT DEFAULT 'biblio',
190 when_to_run TIMESTAMPTZ DEFAULT NOW(),
191 queue_id INT DEFAULT NULL,
192 ingest_action TEXT DEFAULT 'update', -- will be the most common?
193 old_state_data TEXT DEFAULT ''
194 ) RETURNS BOOL AS $F$
196 new_entry action.ingest_queue_entry%ROWTYPE;
197 prev_del_entry action.ingest_queue_entry%ROWTYPE;
202 IF ingest_action = 'delete' THEN
203 -- first see if there is an outstanding entry
204 SELECT * INTO prev_del_entry
205 FROM action.ingest_queue_entry
206 WHERE qe.record = record_id
207 AND qe.state_date = old_state_data
208 AND qe.record_type = rtype
209 AND qe.ingest_time IS NULL
210 AND qe.override_by IS NULL;
213 WITH existing_queue_entry_cte AS (
214 SELECT queue_id AS queue,
215 rtype AS record_type,
217 qe.id AS override_by,
218 ingest_action AS action,
220 old_state_data AS state_data
221 FROM action.ingest_queue_entry qe
222 JOIN action.ingest_queue q ON (qe.queue = q.id)
223 WHERE qe.record = record_id
224 AND q.end_time IS NULL
225 AND qe.record_type = rtype
226 AND qe.state_data = old_state_data
227 AND qe.ingest_time IS NULL
228 AND qe.fail_time IS NULL
229 AND qe.override_by IS NULL
230 ), existing_nonqueue_entry_cte AS (
231 SELECT queue_id AS queue,
232 rtype AS record_type,
234 qe.id AS override_by,
235 ingest_action AS action,
237 old_state_data AS state_data
238 FROM action.ingest_queue_entry qe
239 WHERE qe.record = record_id
241 AND qe.record_type = rtype
242 AND qe.state_data = old_state_data
243 AND qe.ingest_time IS NULL
244 AND qe.fail_time IS NULL
245 AND qe.override_by IS NULL
246 ), new_entry_cte AS (
247 SELECT * FROM existing_queue_entry_cte
249 SELECT * FROM existing_nonqueue_entry_cte
251 SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
252 ), insert_entry_cte AS (
253 INSERT INTO action.ingest_queue_entry
254 (queue, record_type, record, override_by, action, run_at, state_data)
255 SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
256 ORDER BY 4 NULLS LAST, 6
259 ) SELECT * INTO new_entry FROM insert_entry_cte;
261 IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
262 UPDATE action.ingest_queue_entry
263 SET override_by = new_entry.id
264 WHERE id = prev_del_entry.id;
266 UPDATE action.ingest_queue_entry
267 SET override_by = NULL
268 WHERE id = new_entry.id;
270 ELSIF new_entry.override_by IS NOT NULL THEN
271 RETURN TRUE; -- already handled, don't notify
274 NOTIFY queued_ingest;
277 EXCEPTION WHEN OTHERS THEN
278 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
279 diag_context = PG_EXCEPTION_CONTEXT;
280 RAISE WARNING '%\n%', diag_detail, diag_context;
283 $F$ LANGUAGE PLPGSQL;
285 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
287 ingest_success BOOL := NULL;
288 qe action.ingest_queue_entry%ROWTYPE;
289 aid authority.record_entry.id%TYPE;
292 SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
293 IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
294 RETURN TRUE; -- Already done
297 IF qe.action = 'delete' THEN
298 IF qe.record_type = 'biblio' THEN
299 SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
300 ELSIF qe.record_type = 'authority' THEN
301 SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
304 IF qe.record_type = 'biblio' THEN
305 IF qe.action = 'propagate' THEN
306 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO aid;
307 SELECT aid = qe.state_data::BIGINT INTO ingest_success;
309 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
311 ELSIF qe.record_type = 'authority' THEN
312 SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
316 IF NOT ingest_success THEN
317 UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
318 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
320 RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
322 RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
325 UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
328 RETURN ingest_success;
330 $func$ LANGUAGE PLPGSQL;
332 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
334 IF NEW.ingest_time IS NOT NULL THEN
335 UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
340 $F$ LANGUAGE PLPGSQL;
342 CREATE TRIGGER complete_duplicated_entries_trigger
343 AFTER UPDATE ON action.ingest_queue_entry
344 FOR EACH ROW WHEN (NEW.override_by IS NULL)
345 EXECUTE PROCEDURE action.complete_duplicated_entries();
347 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
348 $_SHARED{"ingest_queue_id"} = $_[0];
351 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
352 return $_SHARED{"ingest_queue_id"};
355 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
356 delete($_SHARED{"ingest_queue_id"});
359 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
360 $_SHARED{"ingest_queue_force"} = $_[0];
363 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
364 return $_SHARED{"ingest_queue_force"};
367 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
368 delete($_SHARED{"ingest_queue_force"});
371 ------------------ ingest functions ------------------
373 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
379 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
382 PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
385 -- One needs to keep these around to support searches
386 -- with the #deleted modifier, so one should turn on the named
387 -- internal flag for that functionality.
388 DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
391 DELETE FROM authority.bib_linking abl WHERE abl.bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
392 DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
393 DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
396 EXCEPTION WHEN OTHERS THEN
397 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
398 diag_context = PG_EXCEPTION_CONTEXT;
399 RAISE WARNING '%\n%', diag_detail, diag_context;
402 $func$ LANGUAGE PLPGSQL;
404 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
406 skip_facet BOOL := FALSE;
407 skip_display BOOL := FALSE;
408 skip_browse BOOL := FALSE;
409 skip_search BOOL := FALSE;
410 skip_auth BOOL := FALSE;
411 skip_full BOOL := FALSE;
412 skip_attrs BOOL := FALSE;
413 skip_luri BOOL := FALSE;
414 skip_mrmap BOOL := FALSE;
415 only_attrs TEXT[] := NULL;
416 only_fields INT[] := '{}'::INT[];
421 -- Record authority linking
422 SELECT extra LIKE '%skip_authority%' INTO skip_auth;
423 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
424 IF NOT FOUND AND NOT skip_auth THEN
425 PERFORM biblio.map_authority_linking( bib.id, bib.marc );
428 -- Flatten and insert the mfr data
429 SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
430 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
431 IF NOT FOUND AND NOT skip_full THEN
432 PERFORM metabib.reingest_metabib_full_rec(bib.id);
435 -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
436 SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
437 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
438 IF NOT FOUND AND NOT skip_attrs THEN
439 IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
440 SELECT REGEXP_SPLIT_TO_ARRAY(
441 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
446 PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
449 -- Gather and insert the field entry data
450 SELECT extra LIKE '%skip_facet%' INTO skip_facet;
451 SELECT extra LIKE '%skip_display%' INTO skip_display;
452 SELECT extra LIKE '%skip_browse%' INTO skip_browse;
453 SELECT extra LIKE '%skip_search%' INTO skip_search;
455 IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
456 SELECT REGEXP_SPLIT_TO_ARRAY(
457 (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
459 )::INT[] INTO only_fields;
462 IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
463 PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
467 SELECT extra LIKE '%skip_luri%' INTO skip_luri;
468 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
469 IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
471 -- (re)map metarecord-bib linking
472 SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
473 IF insert_only THEN -- if not deleted and performing an insert, check for the flag
474 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
475 IF NOT FOUND AND NOT skip_mrmap THEN
476 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
478 ELSE -- we're doing an update, and we're not deleted, remap
479 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
480 IF NOT FOUND AND NOT skip_mrmap THEN
481 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
486 EXCEPTION WHEN OTHERS THEN
487 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
488 diag_context = PG_EXCEPTION_CONTEXT;
489 RAISE WARNING '%\n%', diag_detail, diag_context;
492 $func$ LANGUAGE PLPGSQL;
494 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
500 DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
501 DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
502 DELETE FROM authority.simple_heading WHERE record = NEW.id;
503 -- Should remove matching $0 from controlled fields at the same time?
505 -- XXX What do we about the actual linking subfields present in
506 -- authority records that target this one when this happens?
507 DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
510 EXCEPTION WHEN OTHERS THEN
511 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
512 diag_context = PG_EXCEPTION_CONTEXT;
513 RAISE WARNING '%\n%', diag_detail, diag_context;
516 $func$ LANGUAGE PLPGSQL;
519 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
521 ashs authority.simple_heading%ROWTYPE;
522 mbe_row metabib.browse_entry%ROWTYPE;
529 -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
530 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
532 IF NOT FOUND AND auth.heading <> old_heading THEN
533 PERFORM authority.propagate_changes(auth.id);
536 IF NOT insert_only THEN
537 DELETE FROM authority.authority_linking WHERE source = auth.id;
538 DELETE FROM authority.simple_heading WHERE record = auth.id;
541 INSERT INTO authority.authority_linking (source, target, field)
542 SELECT source, target, field FROM authority.calculate_authority_linking(
543 auth.id, auth.control_set, auth.marc::XML
546 FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
548 INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
549 VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
550 ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
552 SELECT INTO mbe_row * FROM metabib.browse_entry
553 WHERE value = ashs.value AND sort_value = ashs.sort_value;
556 mbe_id := mbe_row.id;
558 INSERT INTO metabib.browse_entry
559 ( value, sort_value ) VALUES
560 ( ashs.value, ashs.sort_value );
562 mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
565 INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
569 -- Flatten and insert the afr data
570 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
572 PERFORM authority.reingest_authority_full_rec(auth.id);
573 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
575 PERFORM authority.reingest_authority_rec_descriptor(auth.id);
579 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
581 PERFORM search.symspell_dictionary_reify();
585 EXCEPTION WHEN OTHERS THEN
586 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
587 diag_context = PG_EXCEPTION_CONTEXT;
588 RAISE WARNING '%\n%', diag_detail, diag_context;
591 $func$ LANGUAGE PLPGSQL;
593 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
595 old_state_data TEXT := '';
598 queuing_flag_name TEXT;
599 queuing_flag BOOL := FALSE;
600 queuing_success BOOL := FALSE;
601 ingest_success BOOL := FALSE;
605 -- Identify the ingest action type
606 IF TG_OP = 'UPDATE' THEN
608 -- Gather type-specific data for later use
609 IF TG_TABLE_SCHEMA = 'authority' THEN
610 old_state_data = OLD.heading;
613 IF NOT OLD.deleted THEN -- maybe reingest?
615 new_action = 'delete'; -- nope, delete
617 new_action = 'update'; -- yes, update
619 ELSIF NOT NEW.deleted THEN
620 new_action = 'insert'; -- revivify, AKA insert
622 RETURN NEW; -- was and is still deleted, don't ingest
624 ELSIF TG_OP = 'INSERT' THEN
625 new_action = 'insert'; -- brand new
627 RETURN OLD; -- really deleting the record
630 queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
631 -- See if we should be queuing anything
632 SELECT enabled INTO queuing_flag
633 FROM config.internal_flag
634 WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
638 SELECT action.get_queued_ingest_force() INTO queuing_force;
639 IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
640 queuing_flag := TRUE;
643 -- you (or part of authority propagation) can forcibly disable specific queuing actions
644 IF queuing_force = queuing_flag_name||'.disabled' THEN
645 queuing_flag := FALSE;
648 -- And if we should be queuing ...
650 ingest_queue := action.get_ingest_queue();
652 -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
653 IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
655 PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
657 -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
658 IF NOT FOUND AND OLD.marc = NEW.marc THEN
663 -- Otherwise, attempt to enqueue
664 SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
667 -- If queuing was not requested, or failed for some reason, do it live.
668 IF NOT queuing_success THEN
670 RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
673 IF new_action = 'delete' THEN
674 IF TG_TABLE_SCHEMA = 'biblio' THEN
675 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
676 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
677 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
680 IF TG_TABLE_SCHEMA = 'biblio' THEN
681 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
682 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
683 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
687 IF NOT ingest_success THEN
688 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
690 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
692 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
699 $func$ LANGUAGE PLPGSQL;
701 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
702 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
704 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
705 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
707 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
709 transformed_xml TEXT;
710 rmarc TEXT := prmarc;
714 xfrm config.xml_transform%ROWTYPE;
715 attr_vector INT[] := '{}'::INT[];
716 attr_vector_tmp INT[];
717 attr_list TEXT[] := pattr_list;
719 norm_attr_value TEXT[];
722 attr_def config.record_attr_definition%ROWTYPE;
723 ccvm_row config.coded_value_map%ROWTYPE;
727 IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
728 SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
731 fixed_field IS NOT NULL OR
733 phys_char_sf IS NOT NULL OR
740 IF rmarc IS NULL THEN
741 SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
744 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
746 jump_past := FALSE; -- This gets set when we are non-multi and have found something
747 attr_value := '{}'::TEXT[];
748 norm_attr_value := '{}'::TEXT[];
749 attr_vector_tmp := '{}'::INT[];
751 SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
753 IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
754 SELECT ARRAY_AGG(value) INTO attr_value
755 FROM (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
757 AND tag LIKE attr_def.tag
759 WHEN attr_def.sf_list IS NOT NULL
760 THEN POSITION(subfield IN attr_def.sf_list) > 0
766 IF NOT attr_def.multi THEN
767 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
772 IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
773 attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
775 IF NOT attr_def.multi THEN
776 attr_value := ARRAY[attr_value[1]];
781 IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
783 SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
785 -- See if we can skip the XSLT ... it's expensive
786 IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
787 -- Can't skip the transform
788 IF xfrm.xslt <> '---' THEN
789 transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
791 transformed_xml := rmarc;
794 prev_xfrm := xfrm.name;
797 IF xfrm.name IS NULL THEN
798 -- just grab the marcxml (empty) transform
799 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
800 prev_xfrm := xfrm.name;
803 FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
804 tmp_val := oils_xpath_string(
807 COALESCE(attr_def.joiner,' '),
808 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
810 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
811 attr_value := attr_value || tmp_val;
812 EXIT WHEN NOT attr_def.multi;
817 IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
818 SELECT ARRAY_AGG(m.value) INTO tmp_array
819 FROM vandelay.marc21_physical_characteristics(rmarc) v
820 LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
821 WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
822 AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
824 attr_value := attr_value || tmp_array;
826 IF NOT attr_def.multi THEN
827 attr_value := ARRAY[attr_value[1]];
832 -- apply index normalizers to attr_value
833 FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
835 SELECT n.func AS func,
836 n.param_count AS param_count,
838 FROM config.index_normalizer n
839 JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
840 WHERE attr = attr_def.name
842 EXECUTE 'SELECT ' || normalizer.func || '(' ||
843 COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
845 WHEN normalizer.param_count > 0
846 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
852 IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
853 -- note that a string that contains only blanks
854 -- is a valid value for some attributes
855 norm_attr_value := norm_attr_value || tmp_val;
859 IF attr_def.filter THEN
860 -- Create unknown uncontrolled values and find the IDs of the values
861 IF ccvm_row.id IS NULL THEN
862 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
863 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
864 BEGIN -- use subtransaction to isolate unique constraint violations
865 INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
866 EXCEPTION WHEN unique_violation THEN END;
870 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
872 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
875 -- Add the new value to the vector
876 attr_vector := attr_vector || attr_vector_tmp;
879 IF attr_def.sorter THEN
880 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
881 IF norm_attr_value[1] IS NOT NULL THEN
882 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
888 /* We may need to rewrite the vlist to contain
889 the intersection of new values for requested
890 attrs and old values for ignored attrs. To
891 do this, we take the old attr vlist and
892 subtract any values that are valid for the
893 requested attrs, and then add back the new
894 set of attr values. */
896 IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
897 SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
898 SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
899 attr_vector := attr_vector || attr_vector_tmp;
902 -- On to composite attributes, now that the record attrs have been pulled. Processed in name order, so later composite
903 -- attributes can depend on earlier ones.
904 PERFORM metabib.compile_composite_attr_cache_init();
905 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
907 FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
909 tmp_val := metabib.compile_composite_attr( ccvm_row.id );
910 CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
912 IF attr_def.filter THEN
913 IF attr_vector @@ tmp_val::query_int THEN
914 attr_vector = attr_vector + intset(ccvm_row.id);
915 EXIT WHEN NOT attr_def.multi;
919 IF attr_def.sorter THEN
920 IF attr_vector @@ tmp_val THEN
921 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
922 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
930 IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
931 INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
932 ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
937 $func$ LANGUAGE PLPGSQL;
939 CREATE OR REPLACE FUNCTION authority.propagate_changes
940 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
942 queuing_success BOOL := FALSE;
945 PERFORM 1 FROM config.global_flag
946 WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
950 -- XXX enqueue special 'propagate' bib action
951 SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), NULL, 'propagate', aid::TEXT) INTO queuing_success;
953 IF queuing_success THEN
958 PERFORM authority.apply_propagate_changes(aid, bid);
961 $func$ LANGUAGE PLPGSQL;
963 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
964 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
966 bib_forced BOOL := FALSE;
967 bib_rec biblio.record_entry%ROWTYPE;
971 SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
973 new_marc := vandelay.merge_record_xml(
974 bib_rec.marc, authority.generate_overlay_template(aid));
976 IF new_marc = bib_rec.marc THEN
977 -- Authority record change had no impact on this bib record.
978 -- Nothing left to do.
982 PERFORM 1 FROM config.global_flag
983 WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
987 -- update the bib record editor and edit_date
989 SELECT editor FROM authority.record_entry WHERE id = aid);
990 bib_rec.edit_date = NOW();
993 PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
995 UPDATE biblio.record_entry SET
997 editor = bib_rec.editor,
998 edit_date = bib_rec.edit_date
1001 PERFORM action.clear_queued_ingest_force();
1006 $func$ LANGUAGE PLPGSQL;
1008 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
1010 skip_facet BOOL DEFAULT FALSE,
1011 skip_display BOOL DEFAULT FALSE,
1012 skip_browse BOOL DEFAULT FALSE,
1013 skip_search BOOL DEFAULT FALSE,
1014 only_fields INT[] DEFAULT '{}'::INT[]
1015 ) RETURNS VOID AS $func$
1018 ind_data metabib.field_entry_template%ROWTYPE;
1019 mbe_row metabib.browse_entry%ROWTYPE;
1022 b_skip_display BOOL;
1026 field_list INT[] := only_fields;
1027 field_types TEXT[] := '{}'::TEXT[];
1030 IF field_list = '{}'::INT[] THEN
1031 SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
1034 SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
1035 SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
1036 SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
1037 SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
1039 IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
1040 IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
1041 IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
1042 IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
1044 PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
1046 IF NOT b_skip_search THEN
1047 FOR fclass IN SELECT * FROM config.metabib_class LOOP
1048 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
1051 IF NOT b_skip_facet THEN
1052 DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
1054 IF NOT b_skip_display THEN
1055 DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
1057 IF NOT b_skip_browse THEN
1058 DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
1062 FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1064 -- don't store what has been normalized away
1065 CONTINUE WHEN ind_data.value IS NULL;
1067 IF ind_data.field < 0 THEN
1068 ind_data.field = -1 * ind_data.field;
1071 IF ind_data.facet_field AND NOT b_skip_facet THEN
1072 INSERT INTO metabib.facet_entry (field, source, value)
1073 VALUES (ind_data.field, ind_data.source, ind_data.value);
1076 IF ind_data.display_field AND NOT b_skip_display THEN
1077 INSERT INTO metabib.display_entry (field, source, value)
1078 VALUES (ind_data.field, ind_data.source, ind_data.value);
1082 IF ind_data.browse_field AND NOT b_skip_browse THEN
1083 -- A caveat about this SELECT: this should take care of replacing
1084 -- old mbe rows when data changes, but not if normalization (by
1085 -- which I mean specifically the output of
1086 -- evergreen.oils_tsearch2()) changes. It may or may not be
1087 -- expensive to add a comparison of index_vector to index_vector
1088 -- to the WHERE clause below.
1090 CONTINUE WHEN ind_data.sort_value IS NULL;
1092 value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1093 IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1094 SELECT INTO mbe_row * FROM metabib.browse_entry
1095 WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1096 ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1099 IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1100 mbe_id := mbe_row.id;
1101 ELSE -- otherwise, an UPSERT-protected variant
1102 INSERT INTO metabib.browse_entry
1103 ( value, sort_value ) VALUES
1104 ( value_prepped, ind_data.sort_value )
1105 ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1106 RETURNING id INTO mbe_id;
1109 INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1110 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1113 IF ind_data.search_field AND NOT b_skip_search THEN
1114 -- Avoid inserting duplicate rows
1115 EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1116 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1117 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1118 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1119 IF mbe_id IS NULL THEN
1121 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1123 quote_literal(ind_data.field) || $$, $$ ||
1124 quote_literal(ind_data.source) || $$, $$ ||
1125 quote_literal(ind_data.value) ||
1132 IF NOT b_skip_search THEN
1133 PERFORM metabib.update_combined_index_vectors(bib_id);
1134 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1136 PERFORM search.symspell_dictionary_reify();
1142 $func$ LANGUAGE PLPGSQL;
1144 -- get rid of old version
1145 DROP FUNCTION authority.indexing_ingest_or_delete;