Rework reindex
This commit is contained in:
parent
27e5c41498
commit
d952c87d9b
|
@ -140,6 +140,7 @@ import javax.persistence.NoResultException;
|
|||
import javax.persistence.TypedQuery;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
|
@ -1661,34 +1662,25 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
|||
}
|
||||
|
||||
@Override
|
||||
public void migrateLogToVarChar(IResourcePersistentId<?> theResourcePersistentId) {
|
||||
public void migrateLobToVarChar(IResourcePersistentId<?> theResourcePersistentId) {
|
||||
Long id = ((JpaPid)theResourcePersistentId).getId();
|
||||
ResourceTable entity =
|
||||
myEntityManager.find(ResourceTable.class, id, LockModeType.OPTIMISTIC);
|
||||
ResourceTable entity = myResourceTableDao.findById(id).orElse(null);
|
||||
if (entity == null) {
|
||||
ourLog.warn("Unable to find entity with PID: {}", id);
|
||||
} else {
|
||||
IBaseResource resource = myJpaStorageResourceParser.toResource(entity, false);
|
||||
ResourceHistoryTable historyEntity = entity.getCurrentVersionEntity();
|
||||
ResourceEncodingEnum encoding = getConfig().getResourceEncoding();
|
||||
List<String> excludeElements = new ArrayList<>(8);
|
||||
getExcludedElements(historyEntity.getResourceType(), excludeElements, resource.getMeta());
|
||||
String encodedResourceString = encodeResource(resource, encoding, excludeElements, myFhirContext);
|
||||
|
||||
|
||||
historyEntity = myEntityManager.merge(historyEntity);
|
||||
if (getConfig().getInlineResourceTextBelowSize() > 0 && encodedResourceString.length() < getConfig().getInlineResourceTextBelowSize()) {
|
||||
historyEntity.setResourceTextVc(encodedResourceString);
|
||||
historyEntity.setResource(null);
|
||||
} else {
|
||||
historyEntity.setResourceTextVc(null);
|
||||
byte[] resourceBinary = getResourceBinary(encoding, encodedResourceString);
|
||||
historyEntity.setResource(resourceBinary);
|
||||
historyEntity.setResourceTextVc(null);
|
||||
if (historyEntity != null) {
|
||||
if (historyEntity.getEncoding() == ResourceEncodingEnum.JSONC || historyEntity.getEncoding() == ResourceEncodingEnum.JSON) {
|
||||
byte[] resourceBytes = historyEntity.getResource();
|
||||
if (resourceBytes != null) {
|
||||
String resourceText = decodeResource(resourceBytes, historyEntity.getEncoding());
|
||||
if (getConfig().getInlineResourceTextBelowSize() > 0 && resourceText.length() < getConfig().getInlineResourceTextBelowSize()) {
|
||||
ourLog.info("Storing text of resource {} version {} as inline VARCHAR", entity.getResourceId(), entity.getVersion());
|
||||
myResourceHistoryTableDao.setResourceTextVcForVersion(historyEntity.getId(), resourceText);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
historyEntity.setEncoding(encoding);
|
||||
myResourceHistoryTableDao.save(historyEntity);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,7 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
|
|||
|
||||
@Override
|
||||
@Transactional(propagation = Propagation.MANDATORY)
|
||||
public <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds) {
|
||||
public <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds, boolean thePreFetchIndexes) {
|
||||
List<Long> pids = theResolvedIds
|
||||
.stream()
|
||||
.map(t -> ((JpaPid) t).getId())
|
||||
|
@ -182,40 +182,42 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
|
|||
|
||||
List<Long> entityIds;
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsStringPopulated()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "string", "myParamsString", null);
|
||||
}
|
||||
if (thePreFetchIndexes) {
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsStringPopulated()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "string", "myParamsString", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsTokenPopulated()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "token", "myParamsToken", null);
|
||||
}
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsTokenPopulated()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "token", "myParamsToken", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsDatePopulated()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "date", "myParamsDate", null);
|
||||
}
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsDatePopulated()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "date", "myParamsDate", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsQuantityPopulated()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "quantity", "myParamsQuantity", null);
|
||||
}
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsQuantityPopulated()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "quantity", "myParamsQuantity", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasLinks()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "resourceLinks", "myResourceLinks", null);
|
||||
}
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasLinks()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
preFetchIndexes(entityIds, "resourceLinks", "myResourceLinks", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasTags()).map(t->t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
myResourceTagDao.findByResourceIds(entityIds);
|
||||
preFetchIndexes(entityIds, "tags", "myTags", null);
|
||||
}
|
||||
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasTags()).map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (entityIds.size() > 0) {
|
||||
myResourceTagDao.findByResourceIds(entityIds);
|
||||
preFetchIndexes(entityIds, "tags", "myTags", null);
|
||||
}
|
||||
|
||||
entityIds = loadedResourceTableEntries.stream().map(t->t.getId()).collect(Collectors.toList());
|
||||
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.ENABLED) {
|
||||
preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null);
|
||||
entityIds = loadedResourceTableEntries.stream().map(t -> t.getId()).collect(Collectors.toList());
|
||||
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.ENABLED) {
|
||||
preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null);
|
||||
}
|
||||
}
|
||||
|
||||
new QueryChunker<ResourceTable>().chunk(loadedResourceTableEntries, SearchBuilder.getMaximumPageSize() / 2, entries -> {
|
||||
|
|
|
@ -276,7 +276,7 @@ public class TransactionProcessor extends BaseTransactionProcessor {
|
|||
}
|
||||
|
||||
IFhirSystemDao<?,?> systemDao = myApplicationContext.getBean(IFhirSystemDao.class);
|
||||
systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch));
|
||||
systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch), true);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,8 @@ public interface IResourceHistoryTableDao extends JpaRepository<ResourceHistoryT
|
|||
@Query("SELECT t FROM ResourceHistoryTable t WHERE t.myResourceId = :resId ORDER BY t.myResourceVersion ASC")
|
||||
List<ResourceHistoryTable> findAllVersionsForResourceIdInOrder(@Param("resId") Long theId);
|
||||
|
||||
@Query("SELECT t FROM ResourceHistoryTable t WHERE t.myResourceId = :id AND t.myResourceVersion = :version")
|
||||
ResourceHistoryTable findForIdAndVersion(@Param("id") long theId, @Param("version") long theVersion);
|
||||
|
||||
@Query("SELECT t FROM ResourceHistoryTable t LEFT OUTER JOIN FETCH t.myProvenance WHERE t.myResourceId = :id AND t.myResourceVersion = :version")
|
||||
ResourceHistoryTable findForIdAndVersionAndFetchProvenance(@Param("id") long theId, @Param("version") long theVersion);
|
||||
|
@ -73,4 +75,8 @@ public interface IResourceHistoryTableDao extends JpaRepository<ResourceHistoryT
|
|||
@Modifying
|
||||
@Query("DELETE FROM ResourceHistoryTable t WHERE t.myId = :pid")
|
||||
void deleteByPid(@Param("pid") Long theId);
|
||||
|
||||
@Modifying
|
||||
@Query("UPDATE ResourceHistoryTable as t SET t.myResource = null, t.myResourceTextVc = :text WHERE t.myId = :pid")
|
||||
void setResourceTextVcForVersion(@Param("pid") Long id, @Param("text") String resourceText);
|
||||
}
|
||||
|
|
|
@ -1,15 +1,24 @@
|
|||
package ca.uhn.fhir.jpa.dao.r4;
|
||||
|
||||
import ca.uhn.fhir.batch2.api.IJobDataSink;
|
||||
import ca.uhn.fhir.batch2.api.RunOutcome;
|
||||
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
|
||||
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
|
||||
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.support.ValidationSupportContext;
|
||||
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
|
||||
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
|
||||
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
|
||||
import ca.uhn.fhir.jpa.entity.TermValueSet;
|
||||
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
|
||||
import ca.uhn.fhir.jpa.model.entity.ForcedId;
|
||||
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
|
||||
|
@ -87,8 +96,11 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.MethodName.class)
|
||||
|
@ -125,6 +137,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
myDaoConfig.setResourceClientIdStrategy(new DaoConfig().getResourceClientIdStrategy());
|
||||
myDaoConfig.setTagStorageMode(new DaoConfig().getTagStorageMode());
|
||||
myDaoConfig.clearSupportedSubscriptionTypesForUnitTest();
|
||||
myDaoConfig.setInlineResourceTextBelowSize(new DaoConfig().getInlineResourceTextBelowSize());
|
||||
|
||||
TermReadSvcImpl.setForceDisableHibernateSearchForUnitTest(false);
|
||||
}
|
||||
|
@ -810,6 +823,94 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
|
|||
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private ReindexStep myReindexStep;
|
||||
|
||||
@Test
|
||||
public void testReindexJob_OptimizeStorage() {
|
||||
// Setup
|
||||
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
|
||||
IIdType patientId = createPatient(withActiveTrue());
|
||||
data.addTypedPid("Patient", patientId.getIdPartAsLong());
|
||||
for (int i = 0; i < 9; i++) {
|
||||
IIdType nextPatientId = createPatient(withActiveTrue());
|
||||
data.addTypedPid("Patient", nextPatientId.getIdPartAsLong());
|
||||
}
|
||||
|
||||
runInTransaction(()->{
|
||||
assertEquals(10, myResourceHistoryTableDao.count());
|
||||
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
|
||||
assertNull(history.getResourceTextVc());
|
||||
assertNotNull(history.getResource());
|
||||
});
|
||||
|
||||
myDaoConfig.setInlineResourceTextBelowSize(10000);
|
||||
ReindexJobParameters params = new ReindexJobParameters()
|
||||
.setOptimizeStorage(true)
|
||||
.setReindexSearchParameters(false);
|
||||
|
||||
// execute
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params);
|
||||
|
||||
// validate
|
||||
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
||||
assertEquals(2, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
assertEquals(10, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
||||
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
||||
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
|
||||
|
||||
assertEquals(10, outcome.getRecordsProcessed());
|
||||
runInTransaction(()->{
|
||||
assertEquals(10, myResourceHistoryTableDao.count());
|
||||
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
|
||||
assertNotNull(history.getResourceTextVc());
|
||||
assertNull(history.getResource());
|
||||
});
|
||||
Patient patient = myPatientDao.read(patientId, mySrd);
|
||||
assertTrue(patient.getActive());
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReindexJob_OptimizeStorage_NoOp() {
|
||||
// Setup
|
||||
|
||||
// Inlined already, so no reindexing needed
|
||||
myDaoConfig.setInlineResourceTextBelowSize(10000);
|
||||
|
||||
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
|
||||
IIdType patientId = createPatient(withActiveTrue());
|
||||
data.addTypedPid("Patient", patientId.getIdPartAsLong());
|
||||
for (int i = 0; i < 9; i++) {
|
||||
IIdType nextPatientId = createPatient(withActiveTrue());
|
||||
data.addTypedPid("Patient", nextPatientId.getIdPartAsLong());
|
||||
}
|
||||
|
||||
runInTransaction(()->{
|
||||
assertEquals(10, myResourceHistoryTableDao.count());
|
||||
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
|
||||
assertNotNull(history.getResourceTextVc());
|
||||
assertNull(history.getResource());
|
||||
});
|
||||
|
||||
ReindexJobParameters params = new ReindexJobParameters()
|
||||
.setOptimizeStorage(true)
|
||||
.setReindexSearchParameters(false);
|
||||
|
||||
// execute
|
||||
myCaptureQueriesListener.clear();
|
||||
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params);
|
||||
|
||||
// validate
|
||||
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
|
||||
assertEquals(2, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
|
||||
assertEquals(0, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
|
||||
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
|
||||
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearchAndPageThroughResults_SmallChunksOnSameBundleProvider() {
|
||||
|
|
|
@ -8,13 +8,17 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
|
|||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
|
||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.test.PatientReindexTestHelper;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -32,6 +36,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
public class ReindexJobTest extends BaseJpaR4Test {
|
||||
|
||||
|
@ -53,6 +61,7 @@ public class ReindexJobTest extends BaseJpaR4Test {
|
|||
@AfterEach
|
||||
public void after() {
|
||||
myInterceptorRegistry.unregisterAllAnonymousInterceptors();
|
||||
myDaoConfig.setInlineResourceTextBelowSize(new DaoConfig().getInlineResourceTextBelowSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -216,6 +225,82 @@ public class ReindexJobTest extends BaseJpaR4Test {
|
|||
assertEquals("java.lang.Error: foo message", outcome.getErrorMessage());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOptimizeStorage() {
|
||||
// Setup
|
||||
IIdType patientId = createPatient(withActiveTrue());
|
||||
for (int i = 0; i < 9; i++) {
|
||||
createPatient(withActiveTrue());
|
||||
}
|
||||
|
||||
runInTransaction(()->{
|
||||
assertEquals(10, myResourceHistoryTableDao.count());
|
||||
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
|
||||
assertNull(history.getResourceTextVc());
|
||||
assertNotNull(history.getResource());
|
||||
});
|
||||
|
||||
myDaoConfig.setInlineResourceTextBelowSize(10000);
|
||||
|
||||
// execute
|
||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
||||
startRequest.setParameters(
|
||||
new ReindexJobParameters()
|
||||
.setOptimizeStorage(true)
|
||||
.setReindexSearchParameters(false)
|
||||
);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
|
||||
myBatch2JobHelper.awaitJobCompletion(startResponse);
|
||||
|
||||
// validate
|
||||
runInTransaction(()->{
|
||||
assertEquals(10, myResourceHistoryTableDao.count());
|
||||
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
|
||||
assertNotNull(history.getResourceTextVc());
|
||||
assertNull(history.getResource());
|
||||
});
|
||||
Patient patient = myPatientDao.read(patientId, mySrd);
|
||||
assertTrue(patient.getActive());
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOptimizeStorage_DeletedRecords() {
|
||||
// Setup
|
||||
IIdType patientId = createPatient(withActiveTrue());
|
||||
myPatientDao.delete(patientId, mySrd);
|
||||
for (int i = 0; i < 9; i++) {
|
||||
IIdType nextId = createPatient(withActiveTrue());
|
||||
myPatientDao.delete(nextId, mySrd);
|
||||
}
|
||||
|
||||
myDaoConfig.setInlineResourceTextBelowSize(10000);
|
||||
|
||||
// execute
|
||||
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
|
||||
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
|
||||
startRequest.setParameters(
|
||||
new ReindexJobParameters()
|
||||
.setOptimizeStorage(true)
|
||||
.setReindexSearchParameters(false)
|
||||
);
|
||||
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
|
||||
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
|
||||
assertEquals(10, outcome.getCombinedRecordsProcessed());
|
||||
|
||||
try {
|
||||
myPatientDao.read(patientId, mySrd);
|
||||
fail();
|
||||
} catch (ResourceGoneException e) {
|
||||
// good
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static Stream<Arguments> numResourcesParams(){
|
||||
return PatientReindexTestHelper.numResourcesParams();
|
||||
}
|
||||
|
|
|
@ -353,6 +353,11 @@ public class GiantTransactionPerfTest {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceHistoryTable findForIdAndVersion(long theId, long theVersion) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceHistoryTable findForIdAndVersionAndFetchProvenance(long theId, long theVersion) {
|
||||
throw new UnsupportedOperationException();
|
||||
|
@ -388,6 +393,11 @@ public class GiantTransactionPerfTest {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResourceTextVcForVersion(Long id, String resourceText) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResourceHistoryTable> findAll() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
|
@ -133,7 +133,7 @@ public class ConsumeFilesStep implements ILastJobStepWorker<BulkImportJobParamet
|
|||
theTransactionDetails.addResolvedResourceId(next, null);
|
||||
}
|
||||
|
||||
mySystemDao.preFetchResources(resolvedIds);
|
||||
mySystemDao.preFetchResources(resolvedIds, true);
|
||||
|
||||
for (IBaseResource next : theResources) {
|
||||
updateResource(theRequestDetails, theTransactionDetails, next);
|
||||
|
|
|
@ -44,16 +44,18 @@ public class ReindexJobParameters extends PartitionedUrlListJobParameters {
|
|||
return defaultIfNull(myOptimizeStorage, Boolean.FALSE);
|
||||
}
|
||||
|
||||
public void setOptimizeStorage(boolean myOptimizeStorage) {
|
||||
public ReindexJobParameters setOptimizeStorage(boolean myOptimizeStorage) {
|
||||
this.myOptimizeStorage = myOptimizeStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isReindexSearchParameters() {
|
||||
return defaultIfNull(myReindexSearchParameters, Boolean.TRUE);
|
||||
}
|
||||
|
||||
public void setReindexSearchParameters(boolean myReindexSearchParameters) {
|
||||
public ReindexJobParameters setReindexSearchParameters(boolean myReindexSearchParameters) {
|
||||
this.myReindexSearchParameters = myReindexSearchParameters;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -119,7 +119,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
|
|||
|
||||
// Prefetch Resources from DB
|
||||
|
||||
mySystemDao.preFetchResources(persistentIds);
|
||||
mySystemDao.preFetchResources(persistentIds, myJobParameters.isReindexSearchParameters());
|
||||
ourLog.info("Prefetched {} resources in {} - Instance[{}] Chunk[{}]", persistentIds.size(), sw, myInstanceId, myChunkId);
|
||||
|
||||
// Reindex
|
||||
|
@ -135,7 +135,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
|
|||
dao.reindex(resourcePersistentId, myRequestDetails, myTransactionDetails);
|
||||
}
|
||||
if (myJobParameters.isOptimizeStorage()) {
|
||||
dao.migrateLogToVarChar(resourcePersistentId);
|
||||
dao.migrateLobToVarChar(resourcePersistentId);
|
||||
}
|
||||
} catch (BaseServerResponseException | DataFormatException e) {
|
||||
String resourceForcedId = myIdHelperService.translatePidIdToForcedIdWithCache(resourcePersistentId).orElse(resourcePersistentId.toString());
|
||||
|
|
|
@ -169,9 +169,11 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
|
|||
<MT extends IBaseMetaType> MT metaGetOperation(Class<MT> theType, RequestDetails theRequestDetails);
|
||||
|
||||
/**
|
||||
* FIXME GGG This is a temporary hack to test whether the baby reindex works.
|
||||
* This will be implemented in a better way in 6.6.0 (or later). This method will go away at that time.
|
||||
*/
|
||||
void migrateLogToVarChar(IResourcePersistentId<?> theResourcePersistentId);
|
||||
default void migrateLobToVarChar(IResourcePersistentId<?> theResourcePersistentId) {
|
||||
// nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a new transaction and performs a patch operation
|
||||
|
|
|
@ -90,7 +90,7 @@ public interface IFhirSystemDao<T, MT> extends IDao {
|
|||
* Preload resources from the database in batch. This method is purely
|
||||
* a performance optimization and must be purely idempotent.
|
||||
*/
|
||||
default <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds) {
|
||||
default <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds, boolean thePreFetchIndexes) {
|
||||
// nothing by default
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue