3 INSERT INTO config.global_flag (name, enabled, label) VALUES (
4 'ingest.queued.max_threads', TRUE,
6 'ingest.queued.max_threads',
7 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
11 'ingest.queued.abort_on_error', FALSE,
13 'ingest.queued.abort_on_error',
14 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
18 'ingest.queued.authority.propagate', FALSE,
20 'ingest.queued.authority.propagate',
21 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
25 'ingest.queued.all', FALSE,
28 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
32 'ingest.queued.biblio.all', FALSE,
34 'ingest.queued.biblio.all',
35 'Queued Ingest: Use Queued Ingest for all bib record ingest',
39 'ingest.queued.authority.all', FALSE,
41 'ingest.queued.authority.all',
42 'Queued Ingest: Use Queued Ingest for all authority record ingest',
46 'ingest.queued.biblio.insert.marc_edit_inline', TRUE,
48 'ingest.queued.biblio.insert.marc_edit_inline',
49 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
53 'ingest.queued.biblio.insert', FALSE,
55 'ingest.queued.biblio.insert',
56 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
60 'ingest.queued.authority.insert', FALSE,
62 'ingest.queued.authority.insert',
63 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
67 'ingest.queued.biblio.update.marc_edit_inline', TRUE,
69 'ingest.queued.biblio.update.marc_edit_inline',
70 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
74 'ingest.queued.biblio.update', FALSE,
76 'ingest.queued.biblio.update',
77 'Queued Ingest: Use Queued Ingest for bib record ingest on update',
81 'ingest.queued.authority.update', FALSE,
83 'ingest.queued.authority.update',
84 'Queued Ingest: Use Queued Ingest for authority record ingest on update',
88 'ingest.queued.biblio.delete', FALSE,
90 'ingest.queued.biblio.delete',
91 'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
95 'ingest.queued.authority.delete', FALSE,
97 'ingest.queued.authority.delete',
98 'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
104 UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
106 CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
109 new_value TEXT := NULL;
110 old_value TEXT := NULL;
114 IF TG_TABLE_SCHEMA = 'authority' THEN
115 IF TG_OP IN ('INSERT', 'UPDATE') THEN
121 SELECT m.field_class INTO search_class
122 FROM authority.control_set_auth_field_metabib_field_map_refs a
123 JOIN config.metabib_field m ON (a.metabib_field=m.id)
124 WHERE a.authority_field = _atag;
130 search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1));
133 IF TG_OP IN ('INSERT', 'UPDATE') THEN
134 new_value := NEW.value;
137 IF TG_OP IN ('DELETE', 'UPDATE') THEN
138 old_value := OLD.value;
141 IF new_value = old_value THEN
144 INSERT INTO search.symspell_dictionary_updates
145 SELECT txid_current(), *
146 FROM search.symspell_build_entries(
153 -- PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value);
155 RETURN NULL; -- always fired AFTER
157 $f$ LANGUAGE PLPGSQL;
159 CREATE TABLE action.ingest_queue (
160 id SERIAL PRIMARY KEY,
161 created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
162 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
163 who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
164 start_time TIMESTAMPTZ,
165 end_time TIMESTAMPTZ,
170 CREATE TABLE action.ingest_queue_entry (
171 id BIGSERIAL PRIMARY KEY,
172 record BIGINT NOT NULL, -- points to a record id of the appropriate record_type
173 record_type TEXT NOT NULL,
174 action TEXT NOT NULL,
175 run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
176 state_data TEXT NOT NULL DEFAULT '',
177 queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
178 override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
179 ingest_time TIMESTAMPTZ,
180 fail_time TIMESTAMPTZ
182 CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
183 CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
185 CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
187 rtype TEXT DEFAULT 'biblio',
188 when_to_run TIMESTAMPTZ DEFAULT NOW(),
189 queue_id INT DEFAULT NULL,
190 ingest_action TEXT DEFAULT 'update', -- will be the most common?
191 old_state_data TEXT DEFAULT ''
192 ) RETURNS BOOL AS $F$
194 new_entry action.ingest_queue_entry%ROWTYPE;
195 prev_del_entry action.ingest_queue_entry%ROWTYPE;
200 IF ingest_action = 'delete' THEN
201 -- first see if there is an outstanding entry
202 SELECT * INTO prev_del_entry
203 FROM action.ingest_queue_entry
204 WHERE qe.record = record_id
205 AND qe.state_date = old_state_data
206 AND qe.record_type = rtype
207 AND qe.ingest_time IS NULL
208 AND qe.override_by IS NULL;
211 WITH existing_queue_entry_cte AS (
212 SELECT queue_id AS queue,
213 rtype AS record_type,
215 qe.id AS override_by,
216 ingest_action AS action,
218 old_state_data AS state_data
219 FROM action.ingest_queue_entry qe
220 JOIN action.ingest_queue q ON (qe.queue = q.id)
221 WHERE qe.record = record_id
222 AND q.end_time IS NULL
223 AND qe.record_type = rtype
224 AND qe.state_data = old_state_data
225 AND qe.ingest_time IS NULL
226 AND qe.fail_time IS NULL
227 AND qe.override_by IS NULL
228 ), existing_nonqueue_entry_cte AS (
229 SELECT queue_id AS queue,
230 rtype AS record_type,
232 qe.id AS override_by,
233 ingest_action AS action,
235 old_state_data AS state_data
236 FROM action.ingest_queue_entry qe
237 WHERE qe.record = record_id
239 AND qe.record_type = rtype
240 AND qe.state_data = old_state_data
241 AND qe.ingest_time IS NULL
242 AND qe.fail_time IS NULL
243 AND qe.override_by IS NULL
244 ), new_entry_cte AS (
245 SELECT * FROM existing_queue_entry_cte
247 SELECT * FROM existing_nonqueue_entry_cte
249 SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
250 ), insert_entry_cte AS (
251 INSERT INTO action.ingest_queue_entry
252 (queue, record_type, record, override_by, action, run_at, state_data)
253 SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
254 ORDER BY 4 NULLS LAST, 6
257 ) SELECT * INTO new_entry FROM insert_entry_cte;
259 IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
260 UPDATE action.ingest_queue_entry
261 SET override_by = new_entry.id
262 WHERE id = prev_del_entry.id;
264 UPDATE action.ingest_queue_entry
265 SET override_by = NULL
266 WHERE id = new_entry.id;
268 ELSIF new_entry.override_by IS NOT NULL THEN
269 RETURN TRUE; -- already handled, don't notify
272 NOTIFY queued_ingest;
275 EXCEPTION WHEN OTHERS THEN
276 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
277 diag_context = PG_EXCEPTION_CONTEXT;
278 RAISE WARNING '%\n%', diag_detail, diag_context;
281 $F$ LANGUAGE PLPGSQL;
283 CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
285 ingest_success BOOL := NULL;
286 qe action.ingest_queue_entry%ROWTYPE;
287 aid authority.record_entry.id%TYPE;
290 SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
291 IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
292 RETURN TRUE; -- Already done
295 IF qe.action = 'delete' THEN
296 IF qe.record_type = 'biblio' THEN
297 SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
298 ELSIF qe.record_type = 'authority' THEN
299 SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
302 IF qe.record_type = 'biblio' THEN
303 IF qe.action = 'propagate' THEN
304 SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO aid;
305 SELECT aid = qe.state_data::BIGINT INTO ingest_success;
307 SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
309 ELSIF qe.record_type = 'authority' THEN
310 SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
314 IF NOT ingest_success THEN
315 UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
316 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
318 RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
320 RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
323 UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
326 RETURN ingest_success;
328 $func$ LANGUAGE PLPGSQL;
330 CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
332 IF NEW.ingest_time IS NOT NULL THEN
333 UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
338 $F$ LANGUAGE PLPGSQL;
340 CREATE TRIGGER complete_duplicated_entries_trigger
341 AFTER UPDATE ON action.ingest_queue_entry
342 FOR EACH ROW WHEN (NEW.override_by IS NULL)
343 EXECUTE PROCEDURE action.complete_duplicated_entries();
345 CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
346 $_SHARED{"ingest_queue_id"} = $_[0];
349 CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
350 return $_SHARED{"ingest_queue_id"};
353 CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
354 delete($_SHARED{"ingest_queue_id"});
357 CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
358 $_SHARED{"ingest_queue_force"} = $_[0];
361 CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
362 return $_SHARED{"ingest_queue_force"};
365 CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
366 delete($_SHARED{"ingest_queue_force"});
369 ------------------ ingest functions ------------------
371 CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
377 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
380 PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
383 -- One needs to keep these around to support searches
384 -- with the #deleted modifier, so one should turn on the named
385 -- internal flag for that functionality.
386 DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
389 DELETE FROM authority.bib_linking abl WHERE abl.bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
390 DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
391 DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
394 EXCEPTION WHEN OTHERS THEN
395 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
396 diag_context = PG_EXCEPTION_CONTEXT;
397 RAISE WARNING '%\n%', diag_detail, diag_context;
400 $func$ LANGUAGE PLPGSQL;
402 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
404 skip_facet BOOL := FALSE;
405 skip_display BOOL := FALSE;
406 skip_browse BOOL := FALSE;
407 skip_search BOOL := FALSE;
408 skip_auth BOOL := FALSE;
409 skip_full BOOL := FALSE;
410 skip_attrs BOOL := FALSE;
411 skip_luri BOOL := FALSE;
412 skip_mrmap BOOL := FALSE;
413 only_attrs TEXT[] := NULL;
414 only_fields INT[] := '{}'::INT[];
419 -- Record authority linking
420 SELECT extra LIKE '%skip_authority%' INTO skip_auth;
421 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
422 IF NOT FOUND AND NOT skip_auth THEN
423 PERFORM biblio.map_authority_linking( bib.id, bib.marc );
426 -- Flatten and insert the mfr data
427 SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
428 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
429 IF NOT FOUND AND NOT skip_full THEN
430 PERFORM metabib.reingest_metabib_full_rec(bib.id);
433 -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
434 SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
435 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
436 IF NOT FOUND AND NOT skip_attrs THEN
437 IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
438 SELECT REGEXP_SPLIT_TO_ARRAY(
439 (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
444 PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
447 -- Gather and insert the field entry data
448 SELECT extra LIKE '%skip_facet%' INTO skip_facet;
449 SELECT extra LIKE '%skip_display%' INTO skip_display;
450 SELECT extra LIKE '%skip_browse%' INTO skip_browse;
451 SELECT extra LIKE '%skip_search%' INTO skip_search;
453 IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
454 SELECT REGEXP_SPLIT_TO_ARRAY(
455 (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
457 )::INT[] INTO only_fields;
460 IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
461 PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
465 SELECT extra LIKE '%skip_luri%' INTO skip_luri;
466 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
467 IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
469 -- (re)map metarecord-bib linking
470 SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
471 IF insert_only THEN -- if not deleted and performing an insert, check for the flag
472 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
473 IF NOT FOUND AND NOT skip_mrmap THEN
474 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
476 ELSE -- we're doing an update, and we're not deleted, remap
477 PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
478 IF NOT FOUND AND NOT skip_mrmap THEN
479 PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
484 EXCEPTION WHEN OTHERS THEN
485 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
486 diag_context = PG_EXCEPTION_CONTEXT;
487 RAISE WARNING '%\n%', diag_detail, diag_context;
490 $func$ LANGUAGE PLPGSQL;
492 CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
498 DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
499 DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
500 DELETE FROM authority.simple_heading WHERE record = NEW.id;
501 -- Should remove matching $0 from controlled fields at the same time?
503 -- XXX What do we about the actual linking subfields present in
504 -- authority records that target this one when this happens?
505 DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
508 EXCEPTION WHEN OTHERS THEN
509 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
510 diag_context = PG_EXCEPTION_CONTEXT;
511 RAISE WARNING '%\n%', diag_detail, diag_context;
514 $func$ LANGUAGE PLPGSQL;
517 CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
519 ashs authority.simple_heading%ROWTYPE;
520 mbe_row metabib.browse_entry%ROWTYPE;
527 -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
528 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
530 IF NOT FOUND AND auth.heading <> old_heading THEN
531 PERFORM authority.propagate_changes(auth.id);
534 IF NOT insert_only THEN
535 DELETE FROM authority.authority_linking WHERE source = auth.id;
536 DELETE FROM authority.simple_heading WHERE record = auth.id;
539 INSERT INTO authority.authority_linking (source, target, field)
540 SELECT source, target, field FROM authority.calculate_authority_linking(
541 auth.id, auth.control_set, auth.marc::XML
544 FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
546 INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
547 VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
548 ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
550 SELECT INTO mbe_row * FROM metabib.browse_entry
551 WHERE value = ashs.value AND sort_value = ashs.sort_value;
554 mbe_id := mbe_row.id;
556 INSERT INTO metabib.browse_entry
557 ( value, sort_value ) VALUES
558 ( ashs.value, ashs.sort_value );
560 mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
563 INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
567 -- Flatten and insert the afr data
568 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
570 PERFORM authority.reingest_authority_full_rec(auth.id);
571 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
573 PERFORM authority.reingest_authority_rec_descriptor(auth.id);
577 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
579 PERFORM search.symspell_dictionary_reify();
583 EXCEPTION WHEN OTHERS THEN
584 GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
585 diag_context = PG_EXCEPTION_CONTEXT;
586 RAISE WARNING '%\n%', diag_detail, diag_context;
589 $func$ LANGUAGE PLPGSQL;
591 CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
593 old_state_data TEXT := '';
596 queuing_flag_name TEXT;
597 queuing_flag BOOL := FALSE;
598 queuing_success BOOL := FALSE;
599 ingest_success BOOL := FALSE;
603 -- Identify the ingest action type
604 IF TG_OP = 'UPDATE' THEN
606 -- Gather type-specific data for later use
607 IF TG_TABLE_SCHEMA = 'authority' THEN
608 old_state_data = OLD.heading;
611 IF NOT OLD.deleted THEN -- maybe reingest?
613 new_action = 'delete'; -- nope, delete
615 new_action = 'update'; -- yes, update
617 ELSIF NOT NEW.deleted THEN
618 new_action = 'insert'; -- revivify, AKA insert
620 RETURN NEW; -- was and is still deleted, don't ingest
622 ELSIF TG_OP = 'INSERT' THEN
623 new_action = 'insert'; -- brand new
625 RETURN OLD; -- really deleting the record
628 queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
629 -- See if we should be queuing anything
630 SELECT enabled INTO queuing_flag
631 FROM config.internal_flag
632 WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
636 SELECT action.get_queued_ingest_force() INTO queuing_force;
637 IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
638 queuing_flag := TRUE;
641 -- you (or part of authority propagation) can forcibly disable specific queuing actions
642 IF queuing_force = queuing_flag_name||'.disabled' THEN
643 queuing_flag := FALSE;
646 -- And if we should be queuing ...
648 ingest_queue := action.get_ingest_queue();
650 -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
651 IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
653 PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
655 -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
656 IF NOT FOUND AND OLD.marc = NEW.marc THEN
661 -- Otherwise, attempt to enqueue
662 SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
665 -- If queuing was not requested, or failed for some reason, do it live.
666 IF NOT queuing_success THEN
668 RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
671 IF new_action = 'delete' THEN
672 IF TG_TABLE_SCHEMA = 'biblio' THEN
673 SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
674 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
675 SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
678 IF TG_TABLE_SCHEMA = 'biblio' THEN
679 SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
680 ELSIF TG_TABLE_SCHEMA = 'authority' THEN
681 SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
685 IF NOT ingest_success THEN
686 PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
688 RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
690 RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
697 $func$ LANGUAGE PLPGSQL;
699 DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
700 DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
702 CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
703 CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
705 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
707 transformed_xml TEXT;
708 rmarc TEXT := prmarc;
712 xfrm config.xml_transform%ROWTYPE;
713 attr_vector INT[] := '{}'::INT[];
714 attr_vector_tmp INT[];
715 attr_list TEXT[] := pattr_list;
717 norm_attr_value TEXT[];
720 attr_def config.record_attr_definition%ROWTYPE;
721 ccvm_row config.coded_value_map%ROWTYPE;
725 IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
726 SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
729 fixed_field IS NOT NULL OR
731 phys_char_sf IS NOT NULL OR
738 IF rmarc IS NULL THEN
739 SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
742 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
744 jump_past := FALSE; -- This gets set when we are non-multi and have found something
745 attr_value := '{}'::TEXT[];
746 norm_attr_value := '{}'::TEXT[];
747 attr_vector_tmp := '{}'::INT[];
749 SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
751 IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
752 SELECT ARRAY_AGG(value) INTO attr_value
753 FROM (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
755 AND tag LIKE attr_def.tag
757 WHEN attr_def.sf_list IS NOT NULL
758 THEN POSITION(subfield IN attr_def.sf_list) > 0
764 IF NOT attr_def.multi THEN
765 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
770 IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
771 attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
773 IF NOT attr_def.multi THEN
774 attr_value := ARRAY[attr_value[1]];
779 IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
781 SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
783 -- See if we can skip the XSLT ... it's expensive
784 IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
785 -- Can't skip the transform
786 IF xfrm.xslt <> '---' THEN
787 transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
789 transformed_xml := rmarc;
792 prev_xfrm := xfrm.name;
795 IF xfrm.name IS NULL THEN
796 -- just grab the marcxml (empty) transform
797 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
798 prev_xfrm := xfrm.name;
801 FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
802 tmp_val := oils_xpath_string(
805 COALESCE(attr_def.joiner,' '),
806 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
808 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
809 attr_value := attr_value || tmp_val;
810 EXIT WHEN NOT attr_def.multi;
815 IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
816 SELECT ARRAY_AGG(m.value) INTO tmp_array
817 FROM vandelay.marc21_physical_characteristics(rmarc) v
818 LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
819 WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
820 AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
822 attr_value := attr_value || tmp_array;
824 IF NOT attr_def.multi THEN
825 attr_value := ARRAY[attr_value[1]];
830 -- apply index normalizers to attr_value
831 FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
833 SELECT n.func AS func,
834 n.param_count AS param_count,
836 FROM config.index_normalizer n
837 JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
838 WHERE attr = attr_def.name
840 EXECUTE 'SELECT ' || normalizer.func || '(' ||
841 COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
843 WHEN normalizer.param_count > 0
844 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
850 IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
851 -- note that a string that contains only blanks
852 -- is a valid value for some attributes
853 norm_attr_value := norm_attr_value || tmp_val;
857 IF attr_def.filter THEN
858 -- Create unknown uncontrolled values and find the IDs of the values
859 IF ccvm_row.id IS NULL THEN
860 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
861 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
862 BEGIN -- use subtransaction to isolate unique constraint violations
863 INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
864 EXCEPTION WHEN unique_violation THEN END;
868 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
870 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
873 -- Add the new value to the vector
874 attr_vector := attr_vector || attr_vector_tmp;
877 IF attr_def.sorter THEN
878 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
879 IF norm_attr_value[1] IS NOT NULL THEN
880 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
886 /* We may need to rewrite the vlist to contain
887 the intersection of new values for requested
888 attrs and old values for ignored attrs. To
889 do this, we take the old attr vlist and
890 subtract any values that are valid for the
891 requested attrs, and then add back the new
892 set of attr values. */
894 IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
895 SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
896 SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
897 attr_vector := attr_vector || attr_vector_tmp;
900 -- On to composite attributes, now that the record attrs have been pulled. Processed in name order, so later composite
901 -- attributes can depend on earlier ones.
902 PERFORM metabib.compile_composite_attr_cache_init();
903 FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
905 FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
907 tmp_val := metabib.compile_composite_attr( ccvm_row.id );
908 CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
910 IF attr_def.filter THEN
911 IF attr_vector @@ tmp_val::query_int THEN
912 attr_vector = attr_vector + intset(ccvm_row.id);
913 EXIT WHEN NOT attr_def.multi;
917 IF attr_def.sorter THEN
918 IF attr_vector @@ tmp_val THEN
919 DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
920 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
928 IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
929 INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
930 ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
935 $func$ LANGUAGE PLPGSQL;
937 CREATE OR REPLACE FUNCTION authority.propagate_changes
938 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
940 queuing_success BOOL := FALSE;
943 PERFORM 1 FROM config.global_flag
944 WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
948 -- XXX enqueue special 'propagate' bib action
949 SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), NULL, 'propagate', aid::TEXT) INTO queuing_success;
951 IF queuing_success THEN
956 PERFORM authority.apply_propagate_changes(aid, bid);
959 $func$ LANGUAGE PLPGSQL;
961 CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
962 (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
964 bib_forced BOOL := FALSE;
965 bib_rec biblio.record_entry%ROWTYPE;
969 SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
971 new_marc := vandelay.merge_record_xml(
972 bib_rec.marc, authority.generate_overlay_template(aid));
974 IF new_marc = bib_rec.marc THEN
975 -- Authority record change had no impact on this bib record.
976 -- Nothing left to do.
980 PERFORM 1 FROM config.global_flag
981 WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
985 -- update the bib record editor and edit_date
987 SELECT editor FROM authority.record_entry WHERE id = aid);
988 bib_rec.edit_date = NOW();
991 PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
993 UPDATE biblio.record_entry SET
995 editor = bib_rec.editor,
996 edit_date = bib_rec.edit_date
999 PERFORM action.clear_queued_ingest_force();
1004 $func$ LANGUAGE PLPGSQL;
1006 CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
1008 skip_facet BOOL DEFAULT FALSE,
1009 skip_display BOOL DEFAULT FALSE,
1010 skip_browse BOOL DEFAULT FALSE,
1011 skip_search BOOL DEFAULT FALSE,
1012 only_fields INT[] DEFAULT '{}'::INT[]
1013 ) RETURNS VOID AS $func$
1016 ind_data metabib.field_entry_template%ROWTYPE;
1017 mbe_row metabib.browse_entry%ROWTYPE;
1020 b_skip_display BOOL;
1024 field_list INT[] := only_fields;
1025 field_types TEXT[] := '{}'::TEXT[];
1028 IF field_list = '{}'::INT[] THEN
1029 SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
1032 SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
1033 SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
1034 SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
1035 SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
1037 IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
1038 IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
1039 IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
1040 IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
1042 PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
1044 IF NOT b_skip_search THEN
1045 FOR fclass IN SELECT * FROM config.metabib_class LOOP
1046 EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
1049 IF NOT b_skip_facet THEN
1050 DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
1052 IF NOT b_skip_display THEN
1053 DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
1055 IF NOT b_skip_browse THEN
1056 DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
1060 FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
1062 -- don't store what has been normalized away
1063 CONTINUE WHEN ind_data.value IS NULL;
1065 IF ind_data.field < 0 THEN
1066 ind_data.field = -1 * ind_data.field;
1069 IF ind_data.facet_field AND NOT b_skip_facet THEN
1070 INSERT INTO metabib.facet_entry (field, source, value)
1071 VALUES (ind_data.field, ind_data.source, ind_data.value);
1074 IF ind_data.display_field AND NOT b_skip_display THEN
1075 INSERT INTO metabib.display_entry (field, source, value)
1076 VALUES (ind_data.field, ind_data.source, ind_data.value);
1080 IF ind_data.browse_field AND NOT b_skip_browse THEN
1081 -- A caveat about this SELECT: this should take care of replacing
1082 -- old mbe rows when data changes, but not if normalization (by
1083 -- which I mean specifically the output of
1084 -- evergreen.oils_tsearch2()) changes. It may or may not be
1085 -- expensive to add a comparison of index_vector to index_vector
1086 -- to the WHERE clause below.
1088 CONTINUE WHEN ind_data.sort_value IS NULL;
1090 value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
1091 IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
1092 SELECT INTO mbe_row * FROM metabib.browse_entry
1093 WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
1094 ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
1097 IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
1098 mbe_id := mbe_row.id;
1099 ELSE -- otherwise, an UPSERT-protected variant
1100 INSERT INTO metabib.browse_entry
1101 ( value, sort_value ) VALUES
1102 ( value_prepped, ind_data.sort_value )
1103 ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
1104 RETURNING id INTO mbe_id;
1107 INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
1108 VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
1111 IF ind_data.search_field AND NOT b_skip_search THEN
1112 -- Avoid inserting duplicate rows
1113 EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
1114 '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
1115 INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
1116 -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
1117 IF mbe_id IS NULL THEN
1119 INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
1121 quote_literal(ind_data.field) || $$, $$ ||
1122 quote_literal(ind_data.source) || $$, $$ ||
1123 quote_literal(ind_data.value) ||
1130 IF NOT b_skip_search THEN
1131 PERFORM metabib.update_combined_index_vectors(bib_id);
1132 PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
1134 PERFORM search.symspell_dictionary_reify();
1140 $func$ LANGUAGE PLPGSQL;
1142 -- get rid of old version
1143 DROP FUNCTION authority.indexing_ingest_or_delete;