From 12dd39342fd10e23c5ec16f1c0cf8fcbc1d24afb Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 7 Jan 2025 06:59:42 -0500 Subject: [PATCH] Optmize prefetch for mass ingestion transactions (#6589) * Optmize prefetch for mass ingestion transactions * Add changelog * Spotless * Resolve review comments and spotless --- ...timize_transactions_in_mass_ingestion_mode | 4 + .../fhir/jpa/dao/BaseHapiFhirSystemDao.java | 34 ++- .../fhir/jpa/dao/TransactionProcessor.java | 2 +- .../r4/FhirResourceDaoR4QueryCountTest.java | 232 +++++++++++------- .../jpa/dao/r4/PartitioningSqlR4Test.java | 6 +- .../WebsocketWithSubscriptionIdR5Test.java | 8 +- 6 files changed, 182 insertions(+), 104 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6589_optimize_transactions_in_mass_ingestion_mode diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6589_optimize_transactions_in_mass_ingestion_mode b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6589_optimize_transactions_in_mass_ingestion_mode new file mode 100644 index 00000000000..871d01991ba --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_8_0/6589_optimize_transactions_in_mass_ingestion_mode @@ -0,0 +1,4 @@ +--- +type: perf +issue: 6589 +title: "When performing data loading into a JPA repository using FHIR transactions with Mass Ingestion Mode enabled, the prefetch routine has been optimized to avoid loading the current resource body/contents, since these are not actually needed in Mass Ingestion mode. This avoids a redundant select statement being issued for each transaction and should improve performance." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java index dd121e4459e..1a0ff20261a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java @@ -206,12 +206,42 @@ public abstract class BaseHapiFhirSystemDao extends B * However, for realistic average workloads, this should reduce the number of round trips. */ if (!idChunk.isEmpty()) { - List entityChunk = prefetchResourceTableAndHistory(idChunk); + List entityChunk = null; + + /* + * Unless we're in Mass Ingestion mode, we will pre-fetch the current + * saved resource text in HFJ_RES_VER (ResourceHistoryTable). If we're + * in Mass Ingestion Mode, we don't need to do that because every update + * will generate a new version anyway so the system never needs to know + * the current contents. + */ + if (!myStorageSettings.isMassIngestionMode()) { + entityChunk = prefetchResourceTableAndHistory(idChunk); + } if (thePreFetchIndexes) { + /* + * If we're in mass ingestion mode, then we still need to load the resource + * entries in HFJ_RESOURCE (ResourceTable). We combine that with the search + * for tokens (since token is the most likely kind of index to be populated + * for any arbitrary resource type). + * + * For all other index types, we only load indexes if at least one + * HFJ_RESOURCE row indicates that a resource we care about actually has + * index rows of the given type. + */ + if (entityChunk == null) { + String jqlQuery = + "SELECT r FROM ResourceTable r LEFT JOIN FETCH r.myParamsToken WHERE r.myPid IN ( :IDS )"; + TypedQuery query = myEntityManager.createQuery(jqlQuery, ResourceTable.class); + query.setParameter("IDS", idChunk); + entityChunk = query.getResultList(); + } else { + prefetchByField("token", "myParamsToken", ResourceTable::isParamsTokenPopulated, entityChunk); + } + prefetchByField("string", "myParamsString", ResourceTable::isParamsStringPopulated, entityChunk); - prefetchByField("token", "myParamsToken", ResourceTable::isParamsTokenPopulated, entityChunk); prefetchByField("date", "myParamsDate", ResourceTable::isParamsDatePopulated, entityChunk); prefetchByField( "quantity", "myParamsQuantity", ResourceTable::isParamsQuantityPopulated, entityChunk); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java index 6312c7ba085..16837edded6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseTransactionProcessor { * is for fast writing of data. * * Note that it's probably not necessary to reset it back, it should - * automatically go back to the default value after the transaction but + * automatically go back to the default value after the transaction, but * we reset it just to be safe. */ FlushModeType initialFlushMode = myEntityManager.getFlushMode(); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java index c84f4b5af5e..0071e50e681 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java @@ -20,7 +20,6 @@ import ca.uhn.fhir.context.support.ValueSetExpansionOptions; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.ReindexParameters; -import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome; import ca.uhn.fhir.jpa.api.model.ExpungeOptions; import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum; @@ -28,6 +27,7 @@ import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao; import ca.uhn.fhir.jpa.entity.TermValueSet; import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor; +import ca.uhn.fhir.jpa.model.dao.JpaPid; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test; @@ -80,6 +80,7 @@ import org.hl7.fhir.r4.model.Location; import org.hl7.fhir.r4.model.Narrative; import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.OperationOutcome; +import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Practitioner; @@ -107,11 +108,11 @@ import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Slice; -import org.springframework.util.comparator.ComparableComparator; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.UUID; @@ -132,7 +133,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** @@ -229,14 +229,14 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test p.getMeta().addTag().setSystem("http://foo").setCode("bar"); p.setActive(true); p.addName().setFamily("FOO"); - myPatientDao.update(p).getId(); + myPatientDao.update(p, mySrd); for (int j = 0; j < 5; j++) { p.setActive(!p.getActive()); - myPatientDao.update(p); + myPatientDao.update(p, mySrd); } - myPatientDao.delete(new IdType("Patient/TEST" + i)); + myPatientDao.delete(new IdType("Patient/TEST" + i), mySrd); } myStorageSettings.setExpungeEnabled(true); @@ -344,7 +344,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.addIdentifier().setSystem("urn:system").setValue("2"); p.setManagingOrganization(new Reference(orgId)); - return myPatientDao.create(p).getId().toUnqualified(); + return myPatientDao.create(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.clear(); @@ -353,7 +353,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test p.setId(id.getIdPart()); p.addIdentifier().setSystem("urn:system").setValue("2"); p.setManagingOrganization(new Reference(orgId)); - myPatientDao.update(p); + myPatientDao.update(p, mySrd); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(4); @@ -375,7 +375,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.addIdentifier().setSystem("urn:system").setValue("2"); p.setManagingOrganization(new Reference(orgId)); - return myPatientDao.create(p).getId().toUnqualified(); + return myPatientDao.create(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.clear(); @@ -384,7 +384,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test p.setId(id.getIdPart()); p.addIdentifier().setSystem("urn:system").setValue("3"); p.setManagingOrganization(new Reference(orgId2)); - myPatientDao.update(p).getResource(); + assertNotNull(myPatientDao.update(p, mySrd).getResource()); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(5); @@ -458,19 +458,17 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.getMeta().addTag("http://system", "foo", "display"); p.addIdentifier().setSystem("urn:system").setValue("2"); - return myPatientDao.create(p).getId().toUnqualified(); + return myPatientDao.create(p, mySrd).getId().toUnqualified(); }); - runInTransaction(() -> { - assertEquals(1, myResourceTagDao.count()); - }); + runInTransaction(() -> assertEquals(1, myResourceTagDao.count())); myCaptureQueriesListener.clear(); runInTransaction(() -> { Patient p = new Patient(); p.setId(id.getIdPart()); p.addIdentifier().setSystem("urn:system").setValue("3"); - IBaseResource newRes = myPatientDao.update(p).getResource(); + IBaseResource newRes = myPatientDao.update(p, mySrd).getResource(); assertEquals(1, newRes.getMeta().getTag().size()); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); @@ -628,12 +626,12 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test IIdType id = runInTransaction(() -> { Patient p = new Patient(); p.addIdentifier().setSystem("urn:system").setValue("2"); - return myPatientDao.create(p).getId().toUnqualified(); + return myPatientDao.create(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.clear(); runInTransaction(() -> { - myPatientDao.read(id.toVersionless()); + myPatientDao.read(id.toVersionless(), mySrd); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); assertThat(myCaptureQueriesListener.getSelectQueriesForCurrentThread()).hasSize(2); @@ -916,6 +914,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(10, outcome.getDeletedEntities().size()); } + @SuppressWarnings("unchecked") @Test public void testDeleteExpungeStep() { // Setup @@ -961,7 +960,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.setId("AAA"); p.getMaritalStatus().setText("123"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); }); @@ -972,7 +971,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.setId("AAA"); p.getMaritalStatus().setText("456"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); @@ -991,7 +990,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.setId("AAA"); p.getMaritalStatus().setText("789"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); @@ -1043,7 +1042,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); observation = new Observation(); observation.getSubject().setReference("Patient/P"); - myObservationDao.create(observation); + myObservationDao.create(observation, mySrd); // select: lookup forced ID assertEquals(1, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); @@ -1067,7 +1066,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test patient.setActive(true); myCaptureQueriesListener.clear(); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); /* * Add a resource with a forced ID target link @@ -1076,7 +1075,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); Observation observation = new Observation(); observation.getSubject().setReference("Patient/P"); - myObservationDao.create(observation); + myObservationDao.create(observation, mySrd); myCaptureQueriesListener.logSelectQueries(); assertEquals(0, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); @@ -1092,7 +1091,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); observation = new Observation(); observation.getSubject().setReference("Patient/P"); - myObservationDao.create(observation); + myObservationDao.create(observation, mySrd); // select: no lookups needed because of cache assertEquals(0, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); @@ -1219,16 +1218,16 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.setId("A"); p.addIdentifier().setSystem("urn:system").setValue("1"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); p = new Patient(); p.setId("B"); p.addIdentifier().setSystem("urn:system").setValue("2"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); p = new Patient(); p.addIdentifier().setSystem("urn:system").setValue("2"); - myPatientDao.create(p).getId().toUnqualified(); + myPatientDao.create(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.clear(); @@ -1268,8 +1267,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test /** * This could definitely stand to be optimized some, since we load tags individually * for each resource - */ - /** + * * See the class javadoc before changing the counts in this test! */ @Test @@ -1282,20 +1280,20 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test p.getMeta().addTag("system", "code2", "displaY2"); p.setId("A"); p.addIdentifier().setSystem("urn:system").setValue("1"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); p = new Patient(); p.getMeta().addTag("system", "code1", "displaY1"); p.getMeta().addTag("system", "code2", "displaY2"); p.setId("B"); p.addIdentifier().setSystem("urn:system").setValue("2"); - myPatientDao.update(p).getId().toUnqualified(); + myPatientDao.update(p, mySrd).getId().toUnqualified(); p = new Patient(); p.getMeta().addTag("system", "code1", "displaY1"); p.getMeta().addTag("system", "code2", "displaY2"); p.addIdentifier().setSystem("urn:system").setValue("2"); - myPatientDao.create(p).getId().toUnqualified(); + myPatientDao.create(p, mySrd).getId().toUnqualified(); }); myCaptureQueriesListener.clear(); @@ -1347,16 +1345,16 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test } assertThat(foundIds).hasSize(ids.size()); - ids.sort(new ComparableComparator<>()); - foundIds.sort(new ComparableComparator<>()); + ids.sort(Comparator.naturalOrder()); + foundIds.sort(Comparator.naturalOrder()); assertEquals(ids, foundIds); // This really generates a surprising number of selects and commits. We // could stand to reduce this! myCaptureQueriesListener.logSelectQueries(); assertEquals(56, myCaptureQueriesListener.countSelectQueries()); - assertEquals(71, myCaptureQueriesListener.getCommitCount()); - assertEquals(0, myCaptureQueriesListener.getRollbackCount()); + assertEquals(71, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); } /** @@ -1375,13 +1373,13 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test search = myPagingProvider.retrieveResultList(mySrd, search.getUuid()); } - ids.sort(new ComparableComparator<>()); - foundIds.sort(new ComparableComparator<>()); + ids.sort(Comparator.naturalOrder()); + foundIds.sort(Comparator.naturalOrder()); assertEquals(ids, foundIds); assertEquals(22, myCaptureQueriesListener.countSelectQueries()); - assertEquals(21, myCaptureQueriesListener.getCommitCount()); - assertEquals(0, myCaptureQueriesListener.getRollbackCount()); + assertEquals(21, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); } /** @@ -1399,13 +1397,13 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test nextChunk.forEach(t -> foundIds.add(t.getIdElement().toUnqualifiedVersionless().getValue())); } - ids.sort(new ComparableComparator<>()); - foundIds.sort(new ComparableComparator<>()); + ids.sort(Comparator.naturalOrder()); + foundIds.sort(Comparator.naturalOrder()); assertEquals(ids, foundIds); assertEquals(2, myCaptureQueriesListener.countSelectQueries()); - assertEquals(1, myCaptureQueriesListener.getCommitCount()); - assertEquals(0, myCaptureQueriesListener.getRollbackCount()); + assertEquals(1, myCaptureQueriesListener.countCommits()); + assertEquals(0, myCaptureQueriesListener.countRollbacks()); } @Nonnull @@ -1532,18 +1530,18 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("P"); patient.setActive(true); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Observation obs = new Observation(); obs.getSubject().setReference("Patient/P"); - myObservationDao.create(obs); + myObservationDao.create(obs, mySrd); SearchParameterMap map = new SearchParameterMap(); map.setLoadSynchronous(true); map.add("subject", new ReferenceParam("Patient/P")); myCaptureQueriesListener.clear(); - assertEquals(1, myObservationDao.search(map).size().intValue()); + assertEquals(1, myObservationDao.search(map, mySrd).sizeOrThrowNpe()); // (not resolve forced ID), Perform search, load result assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertNoPartitionSelectors(); @@ -1556,7 +1554,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test */ myCaptureQueriesListener.clear(); - assertEquals(1, myObservationDao.search(map).size().intValue()); + assertEquals(1, myObservationDao.search(map, mySrd).sizeOrThrowNpe()); myCaptureQueriesListener.logAllQueriesForCurrentThread(); // (not resolve forced ID), Perform search, load result (this time we reuse the cached forced-id resolution) assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); @@ -1576,18 +1574,18 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("P"); patient.setActive(true); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Observation obs = new Observation(); obs.getSubject().setReference("Patient/P"); - myObservationDao.create(obs); + myObservationDao.create(obs, mySrd); SearchParameterMap map = new SearchParameterMap(); map.setLoadSynchronous(true); map.add("subject", new ReferenceParam("Patient/P")); myCaptureQueriesListener.clear(); - assertEquals(1, myObservationDao.search(map).size().intValue()); + assertEquals(1, myObservationDao.search(map, mySrd).sizeOrThrowNpe()); myCaptureQueriesListener.logAllQueriesForCurrentThread(); // (not Resolve forced ID), Perform search, load result assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); @@ -1600,7 +1598,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test */ myCaptureQueriesListener.clear(); - assertEquals(1, myObservationDao.search(map).size().intValue()); + assertEquals(1, myObservationDao.search(map, mySrd).sizeOrThrowNpe()); myCaptureQueriesListener.logAllQueriesForCurrentThread(); // (NO resolve forced ID), Perform search, load result assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); @@ -1618,16 +1616,16 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("P"); patient.addIdentifier().setSystem("sys").setValue("val"); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Observation obs = new Observation(); obs.setId("O"); obs.getSubject().setReference("Patient/P"); - myObservationDao.update(obs); + myObservationDao.update(obs, mySrd); SearchParameterMap map = SearchParameterMap.newSynchronous(Observation.SP_SUBJECT, new ReferenceParam("identifier", "sys|val")); myCaptureQueriesListener.clear(); - IBundleProvider outcome = myObservationDao.search(map); + IBundleProvider outcome = myObservationDao.search(map, mySrd); assertThat(toUnqualifiedVersionlessIdValues(outcome)).containsExactlyInAnyOrder("Observation/O"); assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); @@ -1650,32 +1648,32 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test patient.getMeta().addTag("http://system", "value1", "display"); patient.setId("P1"); patient.getNameFirstRep().setFamily("FAM1"); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); patient = new Patient(); patient.setId("P2"); patient.getMeta().addTag("http://system", "value1", "display"); patient.getNameFirstRep().setFamily("FAM2"); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); for (int i = 0; i < 3; i++) { CareTeam ct = new CareTeam(); ct.setId("CT1-" + i); ct.getMeta().addTag("http://system", "value11", "display"); ct.getSubject().setReference("Patient/P1"); - myCareTeamDao.update(ct); + myCareTeamDao.update(ct, mySrd); ct = new CareTeam(); ct.setId("CT2-" + i); ct.getMeta().addTag("http://system", "value22", "display"); ct.getSubject().setReference("Patient/P2"); - myCareTeamDao.update(ct); + myCareTeamDao.update(ct, mySrd); } SearchParameterMap map = SearchParameterMap.newSynchronous().addRevInclude(CareTeam.INCLUDE_SUBJECT).setSort(new SortSpec(Patient.SP_NAME)); myCaptureQueriesListener.clear(); - IBundleProvider outcome = myPatientDao.search(map); + IBundleProvider outcome = myPatientDao.search(map, mySrd); assertEquals(SimpleBundleProvider.class, outcome.getClass()); assertThat(toUnqualifiedVersionlessIdValues(outcome)).containsExactlyInAnyOrder("Patient/P1", "CareTeam/CT1-0", "CareTeam/CT1-1", "CareTeam/CT1-2", "Patient/P2", "CareTeam/CT2-0", "CareTeam/CT2-1", "CareTeam/CT2-2"); @@ -1924,7 +1922,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Practitioner pract = new Practitioner(); pract.addIdentifier().setSystem("foo").setValue("bar"); - myPractitionerDao.create(pract); + myPractitionerDao.create(pract, mySrd); runInTransaction(() -> assertEquals(1, myResourceTableDao.count(), () -> myResourceTableDao.findAll().stream().map(t -> t.getIdDt().toUnqualifiedVersionless().getValue()).collect(Collectors.joining(",")))); // First pass @@ -2127,7 +2125,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueries(); - assertEquals(4, myCaptureQueriesListener.countSelectQueries()); + assertEquals(3, myCaptureQueriesListener.countSelectQueries()); myCaptureQueriesListener.logInsertQueries(); assertEquals(2, myCaptureQueriesListener.countInsertQueries()); myCaptureQueriesListener.logUpdateQueries(); @@ -2210,7 +2208,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueries(); - assertEquals(5, myCaptureQueriesListener.countSelectQueries()); + assertEquals(4, myCaptureQueriesListener.countSelectQueries()); myCaptureQueriesListener.logInsertQueries(); assertEquals(5, myCaptureQueriesListener.countInsertQueries()); myCaptureQueriesListener.logUpdateQueries(); @@ -2334,7 +2332,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient pt = new Patient(); pt.setId("ABC"); pt.setActive(true); - myPatientDao.update(pt); + myPatientDao.update(pt, mySrd); Location loc = new Location(); loc.setId("LOC"); @@ -2522,7 +2520,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueries(); - assertEquals(7, myCaptureQueriesListener.countSelectQueries()); + assertEquals(6, myCaptureQueriesListener.countSelectQueries()); myCaptureQueriesListener.logInsertQueries(); assertEquals(4, myCaptureQueriesListener.countInsertQueries()); myCaptureQueriesListener.logUpdateQueries(); @@ -2537,7 +2535,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueries(); - assertEquals(5, myCaptureQueriesListener.countSelectQueries()); + assertEquals(4, myCaptureQueriesListener.countSelectQueries()); myCaptureQueriesListener.logInsertQueries(); assertEquals(4, myCaptureQueriesListener.countInsertQueries()); myCaptureQueriesListener.logUpdateQueries(); @@ -2579,7 +2577,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation"); }); @@ -2593,7 +2591,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation", "Observation"); }); @@ -2606,7 +2604,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation", "Observation", "Observation"); }); @@ -2644,7 +2642,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation"); }); @@ -2663,7 +2661,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertThat(matchUrlQuery).contains("fetch first '2'"); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation", "Observation"); }); @@ -2676,7 +2674,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); runInTransaction(() -> { - List types = myResourceTableDao.findAll().stream().map(t -> t.getResourceType()).collect(Collectors.toList()); + List types = myResourceTableDao.findAll().stream().map(ResourceTable::getResourceType).collect(Collectors.toList()); assertThat(types).containsExactlyInAnyOrder("Patient", "Observation", "Observation", "Observation"); }); @@ -2848,12 +2846,12 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("Patient/A"); patient.setActive(true); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Practitioner practitioner = new Practitioner(); practitioner.setId("Practitioner/B"); practitioner.setActive(true); - myPractitionerDao.update(practitioner); + myPractitionerDao.update(practitioner, mySrd); // Create transaction @@ -2921,11 +2919,11 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setActive(true); - IIdType patientId = myPatientDao.create(patient).getId().toUnqualifiedVersionless(); + IIdType patientId = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); Practitioner practitioner = new Practitioner(); practitioner.setActive(true); - IIdType practitionerId = myPractitionerDao.create(practitioner).getId().toUnqualifiedVersionless(); + IIdType practitionerId = myPractitionerDao.create(practitioner, mySrd).getId().toUnqualifiedVersionless(); // Create transaction Bundle input = new Bundle(); @@ -2994,12 +2992,12 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("Patient/A"); patient.setActive(true); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Practitioner practitioner = new Practitioner(); practitioner.setId("Practitioner/B"); practitioner.setActive(true); - myPractitionerDao.update(practitioner); + myPractitionerDao.update(practitioner, mySrd); // Create transaction @@ -3067,11 +3065,11 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setActive(true); - IIdType patientId = myPatientDao.create(patient).getId().toUnqualifiedVersionless(); + IIdType patientId = myPatientDao.create(patient, mySrd).getId().toUnqualifiedVersionless(); Practitioner practitioner = new Practitioner(); practitioner.setActive(true); - IIdType practitionerId = myPractitionerDao.create(practitioner).getId().toUnqualifiedVersionless(); + IIdType practitionerId = myPractitionerDao.create(practitioner, mySrd).getId().toUnqualifiedVersionless(); // Create transaction Bundle input = new Bundle(); @@ -3139,12 +3137,12 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient patient = new Patient(); patient.setId("Patient/A"); patient.setActive(true); - myPatientDao.update(patient); + myPatientDao.update(patient, mySrd); Practitioner practitioner = new Practitioner(); practitioner.setId("Practitioner/B"); practitioner.setActive(true); - myPractitionerDao.update(practitioner); + myPractitionerDao.update(practitioner, mySrd); // Create transaction @@ -3750,7 +3748,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); Bundle outcome = mySystemDao.transaction(new SystemRequestDetails(), supplier.get()); myCaptureQueriesListener.logSelectQueries(); - assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(5, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); myCaptureQueriesListener.logInsertQueries(); assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(7, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); @@ -3773,7 +3771,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); outcome = mySystemDao.transaction(new SystemRequestDetails(), supplier.get()); - assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(5, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); myCaptureQueriesListener.logInsertQueries(); assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(6, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); @@ -3834,11 +3832,65 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myCaptureQueriesListener.clear(); mySystemDao.transaction(new SystemRequestDetails(), loadResourceFromClasspath(Bundle.class, "r4/transaction-perf-bundle-smallchanges.json")); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); - assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(5, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); assertEquals(6, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(1, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); + } + + /** + * See the class javadoc before changing the counts in this test! + */ + @Test + public void testMassIngestionMode_TransactionWithManyUpdates() { + myStorageSettings.setMassIngestionMode(true); + + for (int i = 0; i < 10; i++) { + Organization org = new Organization(); + org.setId("ORG" + i); + org.setName("ORG " + i); + myOrganizationDao.update(org, mySrd); + } + for (int i = 0; i < 5; i++) { + Patient patient = new Patient(); + patient.setId("PT" + i); + patient.setActive(true); + patient.setManagingOrganization(new Reference("Organization/ORG" + i)); + myPatientDao.update(patient, mySrd); + } + + Supplier supplier = () -> { + BundleBuilder bb = new BundleBuilder(myFhirContext); + + for (int i = 0; i < 10; i++) { + Patient patient = new Patient(); + patient.setId("PT" + i); + // Flip this value + patient.setActive(false); + patient.addIdentifier().setSystem("http://foo").setValue("bar"); + patient.setManagingOrganization(new Reference("Organization/ORG" + i)); + bb.addTransactionUpdateEntry(patient); + } + + return (Bundle) bb.getBundle(); + }; + + // Test + + myCaptureQueriesListener.clear(); + myMemoryCacheService.invalidateAllCaches(); + mySystemDao.transaction(new SystemRequestDetails(), supplier.get()); + myCaptureQueriesListener.logSelectQueries(); + myCaptureQueriesListener.logInsertQueries(); + assertEquals(3, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(40, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); + assertEquals(10, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); + assertEquals(0, myCaptureQueriesListener.countInsertQueriesRepeated()); + + + } /** @@ -3860,7 +3912,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); assertEquals(3, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); runInTransaction(() -> { - ResourceTable version = myResourceTableDao.findById(patientId.getIdPartAsLong()).orElseThrow(); + ResourceTable version = myResourceTableDao.findById(JpaPid.fromId(patientId.getIdPartAsLong())).orElseThrow(); assertFalse(version.isParamsTokenPopulated()); assertFalse(version.isHasLinks()); assertEquals(0, myResourceIndexedSearchParamTokenDao.count()); @@ -3882,7 +3934,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test IIdType idDt = myObservationDao.create(observation, mySrd).getEntity().getIdDt(); runInTransaction(() -> { assertEquals(4, myResourceIndexedSearchParamTokenDao.count()); - ResourceTable version = myResourceTableDao.findById(idDt.getIdPartAsLong()).orElseThrow(); + ResourceTable version = myResourceTableDao.findById(JpaPid.fromId(idDt.getIdPartAsLong())).orElseThrow(); assertTrue(version.isParamsTokenPopulated()); }); @@ -3894,7 +3946,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test assertQueryCount(3, 1, 1, 2); runInTransaction(() -> { assertEquals(0, myResourceIndexedSearchParamTokenDao.count()); - ResourceTable version = myResourceTableDao.findById(idDt.getIdPartAsLong()).orElseThrow(); + ResourceTable version = myResourceTableDao.findById(JpaPid.fromId(idDt.getIdPartAsLong())).orElseThrow(); assertFalse(version.isParamsTokenPopulated()); }); } @@ -3950,12 +4002,10 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test String encoded; IIdType id = null; - int initialAdditionalSelects = 0; if (theStoredInRepository) { id = myPatientDao.create(resource, mySrd).getId(); resource = null; encoded = null; - initialAdditionalSelects = 1; } else { resource.setId("A"); encoded = myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(resource); @@ -4149,7 +4199,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Patient p = new Patient(); p.getMeta().addTag("http://system", "foo", "display"); p.addIdentifier().setSystem("urn:system").setValue("2"); - return myPatientDao.create(p).getId().toUnqualified(); + return myPatientDao.create(p, mySrd).getId().toUnqualified(); }); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java index 7e2ab8dc998..64ba2efa2d6 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/PartitioningSqlR4Test.java @@ -116,7 +116,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { @BeforeEach public void disableAdvanceIndexing() { - myStorageSettings.setAdvancedHSearchIndexing(false); + myStorageSettings.setHibernateSearchIndexSearchParams(false); // ugh - somewhere the hibernate round trip is mangling LocalDate to h2 date column unless the tz=GMT TimeZone.setDefault(TimeZone.getTimeZone("GMT")); ourLog.info("Running with Timezone {}", TimeZone.getDefault().getID()); @@ -3053,7 +3053,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); - assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); myCaptureQueriesListener.logInsertQueriesForCurrentThread(); assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); myCaptureQueriesListener.logUpdateQueriesForCurrentThread(); @@ -3069,7 +3069,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test { outcome = mySystemDao.transaction(mySrd, input.get()); ourLog.debug("Resp: {}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(outcome)); myCaptureQueriesListener.logSelectQueriesForCurrentThread(); - assertEquals(5, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(4, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); myCaptureQueriesListener.logInsertQueriesForCurrentThread(); assertEquals(4, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); myCaptureQueriesListener.logUpdateQueriesForCurrentThread(); diff --git a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/websocket/WebsocketWithSubscriptionIdR5Test.java b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/websocket/WebsocketWithSubscriptionIdR5Test.java index c7b08aa39c4..8a7e2d04d9c 100644 --- a/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/websocket/WebsocketWithSubscriptionIdR5Test.java +++ b/hapi-fhir-jpaserver-test-r5/src/test/java/ca/uhn/fhir/jpa/subscription/websocket/WebsocketWithSubscriptionIdR5Test.java @@ -112,13 +112,7 @@ public class WebsocketWithSubscriptionIdR5Test extends BaseSubscriptionsR5Test { // Then List messages = myWebsocketClientExtension.getMessages(); - await().until(() -> !messages.isEmpty()); - - // Log it - ourLog.info("Messages: {}", messages); - - // Verify a ping message shall be returned - Assertions.assertTrue(messages.contains("ping " + subscriptionId)); + await().until(() -> messages, t -> t.contains("ping " + subscriptionId)); } @Test