fix reindex optimizeStorage=ALL_VERSIONS (#6421)

This commit is contained in:
Emre Dincturk 2024-10-31 12:03:05 -04:00 committed by GitHub
parent 5fdce4e1b8
commit 59b975068a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 76 additions and 5 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 6419
title: "Previously, on Postgres, the `$reindex` operation with `optimizeStorage` set to `ALL_VERSIONS` would process
only a subset of versions if there were more than 100 versions to be processed for a resource. This has been fixed
so that all versions of the resource are now processed."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 6420
title: "Previously, when the `$reindex` operation is run for a single FHIR resource with `optimizeStorage` set to
`ALL_VERSIONS`, none of the versions of the resource were processed in `hfj_res_ver` table. This has been fixed."

View File

@ -133,6 +133,7 @@ import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@ -1693,9 +1694,15 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
if (theOptimizeStorageMode == ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) {
int pageSize = 100;
for (int page = 0; ((long) page * pageSize) < entity.getVersion(); page++) {
// We need to sort the pages, because we are updating the same data we are paging through.
// If not sorted explicitly, a database like Postgres returns the same data multiple times on
// different pages as the underlying data gets updated.
PageRequest pageRequest = PageRequest.of(page, pageSize, Sort.by("myId"));
Slice<ResourceHistoryTable> historyEntities =
myResourceHistoryTableDao.findForResourceIdAndReturnEntitiesAndFetchProvenance(
PageRequest.of(page, pageSize), entity.getId(), historyEntity.getVersion());
pageRequest, entity.getId(), historyEntity.getVersion());
for (ResourceHistoryTable next : historyEntities) {
reindexOptimizeStorageHistoryEntity(entity, next);
}

View File

@ -205,7 +205,7 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
*
* However, for realistic average workloads, this should reduce the number of round trips.
*/
if (idChunk.size() >= 2) {
if (!idChunk.isEmpty()) {
List<ResourceTable> entityChunk = prefetchResourceTableHistoryAndProvenance(idChunk);
if (thePreFetchIndexes) {

View File

@ -2549,7 +2549,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
ourLog.debug(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertEquals(3, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(2, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
myCaptureQueriesListener.logInsertQueriesForCurrentThread();
assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
myCaptureQueriesListener.logUpdateQueriesForCurrentThread();

View File

@ -428,7 +428,7 @@ public class FhirResourceDaoR4VersionedReferenceTest extends BaseJpaR4Test {
IdType observationId = new IdType(outcome.getEntry().get(1).getResponse().getLocation());
// Make sure we're not introducing any extra DB operations
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(3);
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(2);
// Read back and verify that reference is now versioned
observation = myObservationDao.read(observationId);
@ -463,7 +463,7 @@ public class FhirResourceDaoR4VersionedReferenceTest extends BaseJpaR4Test {
IdType observationId = new IdType(outcome.getEntry().get(1).getResponse().getLocation());
// Make sure we're not introducing any extra DB operations
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(4);
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(3);
// Read back and verify that reference is now versioned
observation = myObservationDao.read(observationId);

View File

@ -180,6 +180,59 @@ public class ReindexTaskTest extends BaseJpaR4Test {
}
@Test
public void testOptimizeStorage_AllVersions_SingleResourceWithMultipleVersion() {
// this difference of this test from testOptimizeStorage_AllVersions is that this one has only 1 resource
// (with multiple versions) in the db. There was a bug where if only one resource were being re-indexed, the
// resource wasn't processed for optimize storage.
// Setup
IIdType patientId = createPatient(withActiveTrue());
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId(patientId.toUnqualifiedVersionless());
p.setActive(true);
p.addIdentifier().setValue(String.valueOf(i));
myPatientDao.update(p, mySrd);
}
// Move resource text to compressed storage, which we don't write to anymore but legacy
// data may exist that was previously stored there, so we're simulating that.
List<ResourceHistoryTable> allHistoryEntities = runInTransaction(() -> myResourceHistoryTableDao.findAll());
allHistoryEntities.forEach(t->relocateResourceTextToCompressedColumn(t.getResourceId(), t.getVersion()));
runInTransaction(()->{
assertEquals(11, myResourceHistoryTableDao.count());
for (ResourceHistoryTable history : myResourceHistoryTableDao.findAll()) {
assertNull(history.getResourceTextVc());
assertNotNull(history.getResource());
}
});
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(JOB_REINDEX);
startRequest.setParameters(
new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.NONE)
);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(startResponse);
// validate
runInTransaction(()->{
assertEquals(11, myResourceHistoryTableDao.count());
for (ResourceHistoryTable history : myResourceHistoryTableDao.findAll()) {
assertNotNull(history.getResourceTextVc());
assertNull(history.getResource());
}
});
Patient patient = myPatientDao.read(patientId, mySrd);
assertTrue(patient.getActive());
}
@Test
public void testOptimizeStorage_AllVersions_CopyProvenanceEntityData() {
// Setup