fix + changelog

This commit is contained in:
nathaniel.doef 2023-02-05 13:02:26 -05:00
parent 87df691fc2
commit 2cbea97ed0
5 changed files with 51 additions and 19 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 4511
jira: SMILE-6064
title: "Previously, bulk export jobs were getting stuck in the `FINALIZE` state when performed
with many resources and a low Bulk Export File Maximum Capacity. This has been fixed."

View File

@ -29,13 +29,13 @@ import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
@ -307,16 +307,6 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return myTxTemplate.execute(tx -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(theInstanceId, theStepId, theStatusEnum));
}
private void fetchChunksForStep(String theInstanceId, String theStepId, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksForStep(PageRequest.of(thePageIndex, thePageSize), theInstanceId, theStepId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, true));
}
});
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@ -325,12 +315,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}
/**
* Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Iterator<WorkChunk> fetchAllWorkChunksForStepIterator(String theInstanceId, String theStepId) {
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunksForStep(theInstanceId, theStepId, theBatchSize, thePageIndex, theConsumer));
List<Batch2WorkChunkEntity> entities = myWorkChunkRepository.fetchChunksForStep(theInstanceId, theStepId);
return new JpaWorkChunkIterator(entities.iterator());
}
/**

View File

@ -0,0 +1,27 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import java.util.Iterator;
public class JpaWorkChunkIterator implements Iterator<WorkChunk> {
private final Iterator<Batch2WorkChunkEntity> myWorkChunkEntities;
public JpaWorkChunkIterator(Iterator<Batch2WorkChunkEntity> theWorkChunkEntities) {
myWorkChunkEntities = theWorkChunkEntities;
}
@Override
public boolean hasNext() {
return myWorkChunkEntities.hasNext();
}
@Override
public WorkChunk next() {
Batch2WorkChunkEntity next = myWorkChunkEntities.next();
return JobInstanceUtil.fromEntityToWorkChunk(next, true);
}
}

View File

@ -40,7 +40,7 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksForStep(Pageable thePageRequest, @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
List<Batch2WorkChunkEntity> fetchChunksForStep(@Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myRecordsProcessed = :rp, e.mySerializedData = null WHERE e.myId = :id")

View File

@ -46,6 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -629,10 +630,12 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
RequestDetails details = new SystemRequestDetails();
List<String> patientIds = new ArrayList<>();
for(int i = 0; i < numPatients; i++){
Patient patient = new Patient();
patient.getNameFirstRep().addGiven("Patient-"+i);
myPatientDao.create(patient, details);
patient = (Patient) myPatientDao.create(patient, details).getResource();
patientIds.add(patient.getIdElement().toUnqualifiedVersionless().toString());
}
int patientsCreated = myPatientDao.search(SearchParameterMap.newSynchronous(), details).size();
@ -647,6 +650,9 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
Batch2JobStartResponse job2 = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(options));
myBatch2JobHelper.awaitJobCompletion(job1.getJobId());
myBatch2JobHelper.awaitJobCompletion(job2.getJobId());
verifyReport(patientIds, Collections.emptyList(), job1);
verifyReport(patientIds, Collections.emptyList(), job2);
}
@ParameterizedTest
@ -700,10 +706,14 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getJobId());
await().until(() -> myJobRunner.getJobInfo(startResponse.getJobId()).getReport() != null);
verifyReport(theContainedList, theExcludedList, startResponse);
}
private void verifyReport(List<String> theContainedList, List<String> theExcludedList, Batch2JobStartResponse theStartResponse) {
await().until(() -> myJobRunner.getJobInfo(theStartResponse.getJobId()).getReport() != null);
// Iterate over the files
String report = myJobRunner.getJobInfo(startResponse.getJobId()).getReport();
String report = myJobRunner.getJobInfo(theStartResponse.getJobId()).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
Set<String> foundIds = new HashSet<>();