From d952c87d9b7cc12e1f79299a3d670a16dd754a15 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Wed, 19 Apr 2023 15:09:12 -0400 Subject: [PATCH] Rework reindex --- .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 36 +++---- .../fhir/jpa/dao/BaseHapiFhirSystemDao.java | 60 ++++++----- .../fhir/jpa/dao/TransactionProcessor.java | 2 +- .../dao/data/IResourceHistoryTableDao.java | 6 ++ .../r4/FhirResourceDaoR4QueryCountTest.java | 101 ++++++++++++++++++ .../fhir/jpa/delete/job/ReindexJobTest.java | 85 +++++++++++++++ .../stresstest/GiantTransactionPerfTest.java | 10 ++ .../batch2/jobs/imprt/ConsumeFilesStep.java | 2 +- .../jobs/reindex/ReindexJobParameters.java | 6 +- .../fhir/batch2/jobs/reindex/ReindexStep.java | 4 +- .../fhir/jpa/api/dao/IFhirResourceDao.java | 6 +- .../uhn/fhir/jpa/api/dao/IFhirSystemDao.java | 2 +- 12 files changed, 260 insertions(+), 60 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 6868767cf39..1bd900b4fe6 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -140,6 +140,7 @@ import javax.persistence.NoResultException; import javax.persistence.TypedQuery; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -1661,34 +1662,25 @@ public abstract class BaseHapiFhirResourceDao extends B } @Override - public void migrateLogToVarChar(IResourcePersistentId theResourcePersistentId) { + public void migrateLobToVarChar(IResourcePersistentId theResourcePersistentId) { Long id = ((JpaPid)theResourcePersistentId).getId(); - ResourceTable entity = - myEntityManager.find(ResourceTable.class, id, LockModeType.OPTIMISTIC); + ResourceTable entity = myResourceTableDao.findById(id).orElse(null); if (entity == null) { ourLog.warn("Unable to find entity with PID: {}", id); } else { - IBaseResource resource = myJpaStorageResourceParser.toResource(entity, false); ResourceHistoryTable historyEntity = entity.getCurrentVersionEntity(); - ResourceEncodingEnum encoding = getConfig().getResourceEncoding(); - List excludeElements = new ArrayList<>(8); - getExcludedElements(historyEntity.getResourceType(), excludeElements, resource.getMeta()); - String encodedResourceString = encodeResource(resource, encoding, excludeElements, myFhirContext); - - - historyEntity = myEntityManager.merge(historyEntity); - if (getConfig().getInlineResourceTextBelowSize() > 0 && encodedResourceString.length() < getConfig().getInlineResourceTextBelowSize()) { - historyEntity.setResourceTextVc(encodedResourceString); - historyEntity.setResource(null); - } else { - historyEntity.setResourceTextVc(null); - byte[] resourceBinary = getResourceBinary(encoding, encodedResourceString); - historyEntity.setResource(resourceBinary); - historyEntity.setResourceTextVc(null); + if (historyEntity != null) { + if (historyEntity.getEncoding() == ResourceEncodingEnum.JSONC || historyEntity.getEncoding() == ResourceEncodingEnum.JSON) { + byte[] resourceBytes = historyEntity.getResource(); + if (resourceBytes != null) { + String resourceText = decodeResource(resourceBytes, historyEntity.getEncoding()); + if (getConfig().getInlineResourceTextBelowSize() > 0 && resourceText.length() < getConfig().getInlineResourceTextBelowSize()) { + ourLog.info("Storing text of resource {} version {} as inline VARCHAR", entity.getResourceId(), entity.getVersion()); + myResourceHistoryTableDao.setResourceTextVcForVersion(historyEntity.getId(), resourceText); + } + } + } } - - historyEntity.setEncoding(encoding); - myResourceHistoryTableDao.save(historyEntity); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java index dc72f6013d6..01bd528c01c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirSystemDao.java @@ -157,7 +157,7 @@ public abstract class BaseHapiFhirSystemDao extends B @Override @Transactional(propagation = Propagation.MANDATORY) - public

void preFetchResources(List

theResolvedIds) { + public

void preFetchResources(List

theResolvedIds, boolean thePreFetchIndexes) { List pids = theResolvedIds .stream() .map(t -> ((JpaPid) t).getId()) @@ -182,40 +182,42 @@ public abstract class BaseHapiFhirSystemDao extends B List entityIds; - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsStringPopulated()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - preFetchIndexes(entityIds, "string", "myParamsString", null); - } + if (thePreFetchIndexes) { + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsStringPopulated()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + preFetchIndexes(entityIds, "string", "myParamsString", null); + } - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsTokenPopulated()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - preFetchIndexes(entityIds, "token", "myParamsToken", null); - } + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsTokenPopulated()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + preFetchIndexes(entityIds, "token", "myParamsToken", null); + } - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsDatePopulated()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - preFetchIndexes(entityIds, "date", "myParamsDate", null); - } + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsDatePopulated()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + preFetchIndexes(entityIds, "date", "myParamsDate", null); + } - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsQuantityPopulated()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - preFetchIndexes(entityIds, "quantity", "myParamsQuantity", null); - } + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsQuantityPopulated()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + preFetchIndexes(entityIds, "quantity", "myParamsQuantity", null); + } - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasLinks()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - preFetchIndexes(entityIds, "resourceLinks", "myResourceLinks", null); - } + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasLinks()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + preFetchIndexes(entityIds, "resourceLinks", "myResourceLinks", null); + } - entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasTags()).map(t->t.getId()).collect(Collectors.toList()); - if (entityIds.size() > 0) { - myResourceTagDao.findByResourceIds(entityIds); - preFetchIndexes(entityIds, "tags", "myTags", null); - } + entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasTags()).map(t -> t.getId()).collect(Collectors.toList()); + if (entityIds.size() > 0) { + myResourceTagDao.findByResourceIds(entityIds); + preFetchIndexes(entityIds, "tags", "myTags", null); + } - entityIds = loadedResourceTableEntries.stream().map(t->t.getId()).collect(Collectors.toList()); - if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.ENABLED) { - preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null); + entityIds = loadedResourceTableEntries.stream().map(t -> t.getId()).collect(Collectors.toList()); + if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.ENABLED) { + preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null); + } } new QueryChunker().chunk(loadedResourceTableEntries, SearchBuilder.getMaximumPageSize() / 2, entries -> { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java index b10f07b36aa..2a9bbec8b6f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/TransactionProcessor.java @@ -276,7 +276,7 @@ public class TransactionProcessor extends BaseTransactionProcessor { } IFhirSystemDao systemDao = myApplicationContext.getBean(IFhirSystemDao.class); - systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch)); + systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch), true); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceHistoryTableDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceHistoryTableDao.java index 1b695abe37d..64307345530 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceHistoryTableDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IResourceHistoryTableDao.java @@ -39,6 +39,8 @@ public interface IResourceHistoryTableDao extends JpaRepository findAllVersionsForResourceIdInOrder(@Param("resId") Long theId); + @Query("SELECT t FROM ResourceHistoryTable t WHERE t.myResourceId = :id AND t.myResourceVersion = :version") + ResourceHistoryTable findForIdAndVersion(@Param("id") long theId, @Param("version") long theVersion); @Query("SELECT t FROM ResourceHistoryTable t LEFT OUTER JOIN FETCH t.myProvenance WHERE t.myResourceId = :id AND t.myResourceVersion = :version") ResourceHistoryTable findForIdAndVersionAndFetchProvenance(@Param("id") long theId, @Param("version") long theVersion); @@ -73,4 +75,8 @@ public interface IResourceHistoryTableDao extends JpaRepository{ + assertEquals(10, myResourceHistoryTableDao.count()); + ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0); + assertNull(history.getResourceTextVc()); + assertNotNull(history.getResource()); + }); + + myDaoConfig.setInlineResourceTextBelowSize(10000); + ReindexJobParameters params = new ReindexJobParameters() + .setOptimizeStorage(true) + .setReindexSearchParameters(false); + + // execute + myCaptureQueriesListener.clear(); + RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + + // validate + myCaptureQueriesListener.logSelectQueriesForCurrentThread(); + assertEquals(2, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); + assertEquals(10, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size()); + assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size()); + assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size()); + + assertEquals(10, outcome.getRecordsProcessed()); + runInTransaction(()->{ + assertEquals(10, myResourceHistoryTableDao.count()); + ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0); + assertNotNull(history.getResourceTextVc()); + assertNull(history.getResource()); + }); + Patient patient = myPatientDao.read(patientId, mySrd); + assertTrue(patient.getActive()); + + } + + + @Test + public void testReindexJob_OptimizeStorage_NoOp() { + // Setup + + // Inlined already, so no reindexing needed + myDaoConfig.setInlineResourceTextBelowSize(10000); + + ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(); + IIdType patientId = createPatient(withActiveTrue()); + data.addTypedPid("Patient", patientId.getIdPartAsLong()); + for (int i = 0; i < 9; i++) { + IIdType nextPatientId = createPatient(withActiveTrue()); + data.addTypedPid("Patient", nextPatientId.getIdPartAsLong()); + } + + runInTransaction(()->{ + assertEquals(10, myResourceHistoryTableDao.count()); + ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0); + assertNotNull(history.getResourceTextVc()); + assertNull(history.getResource()); + }); + + ReindexJobParameters params = new ReindexJobParameters() + .setOptimizeStorage(true) + .setReindexSearchParameters(false); + + // execute + myCaptureQueriesListener.clear(); + RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params); + + // validate + myCaptureQueriesListener.logSelectQueriesForCurrentThread(); + assertEquals(2, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size()); + assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size()); + assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size()); + assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size()); + + } @Test public void testSearchAndPageThroughResults_SmallChunksOnSameBundleProvider() { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java index 964367b365e..880ed700258 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/delete/job/ReindexJobTest.java @@ -8,13 +8,17 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.Pointcut; +import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.jpa.test.PatientReindexTestHelper; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.Patient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -32,6 +36,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class ReindexJobTest extends BaseJpaR4Test { @@ -53,6 +61,7 @@ public class ReindexJobTest extends BaseJpaR4Test { @AfterEach public void after() { myInterceptorRegistry.unregisterAllAnonymousInterceptors(); + myDaoConfig.setInlineResourceTextBelowSize(new DaoConfig().getInlineResourceTextBelowSize()); } @Test @@ -216,6 +225,82 @@ public class ReindexJobTest extends BaseJpaR4Test { assertEquals("java.lang.Error: foo message", outcome.getErrorMessage()); } + + @Test + public void testOptimizeStorage() { + // Setup + IIdType patientId = createPatient(withActiveTrue()); + for (int i = 0; i < 9; i++) { + createPatient(withActiveTrue()); + } + + runInTransaction(()->{ + assertEquals(10, myResourceHistoryTableDao.count()); + ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0); + assertNull(history.getResourceTextVc()); + assertNotNull(history.getResource()); + }); + + myDaoConfig.setInlineResourceTextBelowSize(10000); + + // execute + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters( + new ReindexJobParameters() + .setOptimizeStorage(true) + .setReindexSearchParameters(false) + ); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest); + myBatch2JobHelper.awaitJobCompletion(startResponse); + + // validate + runInTransaction(()->{ + assertEquals(10, myResourceHistoryTableDao.count()); + ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0); + assertNotNull(history.getResourceTextVc()); + assertNull(history.getResource()); + }); + Patient patient = myPatientDao.read(patientId, mySrd); + assertTrue(patient.getActive()); + + } + + + @Test + public void testOptimizeStorage_DeletedRecords() { + // Setup + IIdType patientId = createPatient(withActiveTrue()); + myPatientDao.delete(patientId, mySrd); + for (int i = 0; i < 9; i++) { + IIdType nextId = createPatient(withActiveTrue()); + myPatientDao.delete(nextId, mySrd); + } + + myDaoConfig.setInlineResourceTextBelowSize(10000); + + // execute + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX); + startRequest.setParameters( + new ReindexJobParameters() + .setOptimizeStorage(true) + .setReindexSearchParameters(false) + ); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest); + JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse); + assertEquals(10, outcome.getCombinedRecordsProcessed()); + + try { + myPatientDao.read(patientId, mySrd); + fail(); + } catch (ResourceGoneException e) { + // good + } + + } + + private static Stream numResourcesParams(){ return PatientReindexTestHelper.numResourcesParams(); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java index b3874e41c5d..beb48826295 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/stresstest/GiantTransactionPerfTest.java @@ -353,6 +353,11 @@ public class GiantTransactionPerfTest { throw new UnsupportedOperationException(); } + @Override + public ResourceHistoryTable findForIdAndVersion(long theId, long theVersion) { + throw new UnsupportedOperationException(); + } + @Override public ResourceHistoryTable findForIdAndVersionAndFetchProvenance(long theId, long theVersion) { throw new UnsupportedOperationException(); @@ -388,6 +393,11 @@ public class GiantTransactionPerfTest { throw new UnsupportedOperationException(); } + @Override + public void setResourceTextVcForVersion(Long id, String resourceText) { + throw new UnsupportedOperationException(); + } + @Override public List findAll() { throw new UnsupportedOperationException(); diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/ConsumeFilesStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/ConsumeFilesStep.java index 38cdde4813d..663504945d1 100644 --- a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/ConsumeFilesStep.java +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/imprt/ConsumeFilesStep.java @@ -133,7 +133,7 @@ public class ConsumeFilesStep implements ILastJobStepWorker extends IDao { MT metaGetOperation(Class theType, RequestDetails theRequestDetails); /** - * FIXME GGG This is a temporary hack to test whether the baby reindex works. + * This will be implemented in a better way in 6.6.0 (or later). This method will go away at that time. */ - void migrateLogToVarChar(IResourcePersistentId theResourcePersistentId); + default void migrateLobToVarChar(IResourcePersistentId theResourcePersistentId) { + // nothing + } /** * Opens a new transaction and performs a patch operation diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirSystemDao.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirSystemDao.java index 6ab835f88d6..aba19ba0e03 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirSystemDao.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/api/dao/IFhirSystemDao.java @@ -90,7 +90,7 @@ public interface IFhirSystemDao extends IDao { * Preload resources from the database in batch. This method is purely * a performance optimization and must be purely idempotent. */ - default

void preFetchResources(List

theResolvedIds) { + default

void preFetchResources(List

theResolvedIds, boolean thePreFetchIndexes) { // nothing by default } }