Prevent batch2 job execution to stop for empty chunk when last job st… (#5635)

* Prevent batch2 job execution to stop for empty chunk when last job step is a reduction.
Add output to bulk export result even when empty.

* Fix test

* Unimportant change to force fresh build

* Implement review suggestions

---------

Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>
This commit is contained in:
jmarchionatto 2024-01-30 17:08:50 -05:00 committed by GitHub
parent 16aa9fcdf2
commit 5ed30f3181
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 148 additions and 22 deletions

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 5632
title: "Previously bulk export operation was returning an empty response when no resources matched the request, which
didn't comply with [HL7 HAPI IG](https://hl7.org/fhir/uv/bulkdata/export/index.html#response---complete-status).
This has been corrected."

View File

@ -87,6 +87,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -299,9 +300,9 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertThat(result.getRequiresAccessToken(), is(equalTo(true)));
assertThat(result.getTransactionTime(), is(notNullValue()));
assertEquals(result.getOutput().size(), 3);
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).collect(Collectors.toList()).size());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Observation")).collect(Collectors.toList()).size());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Encounter")).collect(Collectors.toList()).size());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).count());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Observation")).count());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Encounter")).count());
//We assert specifically on content as the deserialized version will "helpfully" fill in missing fields.
assertThat(responseContent, containsString("\"error\" : [ ]"));
@ -338,8 +339,8 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertThat(result.getRequiresAccessToken(), is(equalTo(true)));
assertThat(result.getTransactionTime(), is(notNullValue()));
assertEquals(result.getOutput().size(), 1);
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).collect(Collectors.toList()).size());
assertEquals(0, result.getOutput().stream().filter(o -> o.getType().equals("Binary")).collect(Collectors.toList()).size());
assertEquals(1, result.getOutput().stream().filter(o -> o.getType().equals("Patient")).count());
assertEquals(0, result.getOutput().stream().filter(o -> o.getType().equals("Binary")).count());
//We assert specifically on content as the deserialized version will "helpfully" fill in missing fields.
assertThat(responseContent, containsString("\"error\" : [ ]"));
@ -381,7 +382,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
}
HashSet<String> types = Sets.newHashSet("Patient");
BulkExportJobResults bulkExportJobResults = startSystemBulkExportJobAndAwaitCompletion(types, new HashSet<String>());
BulkExportJobResults bulkExportJobResults = startSystemBulkExportJobAndAwaitCompletion(types, new HashSet<>());
Map<String, List<String>> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds();
assertThat(resourceTypeToBinaryIds.get("Patient"), hasSize(1));
String patientBinaryId = resourceTypeToBinaryIds.get("Patient").get(0);
@ -477,7 +478,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
String entities = myJobInstanceRepository
.findAll()
.stream()
.map(t -> t.toString())
.map(Batch2JobInstanceEntity::toString)
.collect(Collectors.joining("\n * "));
ourLog.info("Entities:\n * " + entities);
});
@ -492,6 +493,41 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertEquals(patientCount, jobInstance.getCombinedRecordsProcessed());
}
@Test
public void testEmptyExport() {
BulkExportJobParameters options = new BulkExportJobParameters();
options.setResourceTypes(Collections.singleton("Patient"));
options.setFilters(Collections.emptySet());
options.setExportStyle(BulkExportJobParameters.ExportStyle.SYSTEM);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT);
startRequest.setParameters(options);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(mySrd, startRequest);
assertNotNull(startResponse);
final String jobId = startResponse.getInstanceId();
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId());
runInTransaction(() -> {
String entities = myJobInstanceRepository
.findAll()
.stream()
.map(Batch2JobInstanceEntity::toString)
.collect(Collectors.joining("\n * "));
ourLog.info("Entities:\n * " + entities);
});
final Optional<JobInstance> optJobInstance = myJobPersistence.fetchInstance(jobId);
assertNotNull(optJobInstance);
assertTrue(optJobInstance.isPresent());
assertThat(optJobInstance.get().getReport(),
containsString("Export complete, but no data to generate report for job instance:"));
}
private void logContentTypeAndResponse(Header[] headers, String response) {
ourLog.info("**************************");
ourLog.info("Content-Type is: {}", headers[0]);
@ -542,7 +578,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
// test
HashSet<String> types = Sets.newHashSet("Patient", "Observation");
BulkExportJobResults bulkExportJobResults = startPatientBulkExportJobAndAwaitResults(types, new HashSet<String>(), "ha");
BulkExportJobResults bulkExportJobResults = startPatientBulkExportJobAndAwaitResults(types, new HashSet<>(), "ha");
Map<String, List<IBaseResource>> typeToResources = convertJobResultsToResources(bulkExportJobResults);
assertThat(typeToResources.get("Patient"), hasSize(1));
assertThat(typeToResources.get("Observation"), hasSize(1));
@ -605,6 +641,34 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertTrue(patientIds.contains(resourceId));
}
}
@Test
public void testExportEmptyResult() {
BulkExportJobParameters options = new BulkExportJobParameters();
options.setResourceTypes(Sets.newHashSet("Patient"));
options.setExportStyle(BulkExportJobParameters.ExportStyle.PATIENT);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(Batch2JobDefinitionConstants.BULK_EXPORT);
startRequest.setParameters(options);
Batch2JobStartResponse job = myJobCoordinator.startInstance(mySrd, startRequest);
myBatch2JobHelper.awaitJobCompletion(job.getInstanceId(), 60);
ourLog.debug("Job status after awaiting - {}", myJobCoordinator.getInstance(job.getInstanceId()).getStatus());
await()
.atMost(300, TimeUnit.SECONDS)
.until(() -> {
StatusEnum status = myJobCoordinator.getInstance(job.getInstanceId()).getStatus();
if (!StatusEnum.COMPLETED.equals(status)) {
fail("Job status was changed from COMPLETE to " + status);
}
return myJobCoordinator.getInstance(job.getInstanceId()).getReport() != null;
});
String report = myJobCoordinator.getInstance(job.getInstanceId()).getReport();
assertThat(report,
containsString("Export complete, but no data to generate report for job instance:"));
}
}
@ -1081,7 +1145,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
}
}
]
}
}
""";
Bundle bundle = parser.parseResource(Bundle.class, bundleStr);
myClient.transaction().withBundle(bundle).execute();
@ -1218,6 +1282,21 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
assertThat(typeToContents.get("Patient"), not(containsString("POG2")));
}
@Test
public void testExportEmptyResult() {
Group group = new Group();
group.setId("Group/G-empty");
group.setActive(true);
myClient.update().resource(group).execute();
HashSet<String> resourceTypes = Sets.newHashSet("Patient");
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(
resourceTypes, new HashSet<>(), "G-empty");
assertThat(bulkExportJobResults.getReportMsg(),
startsWith("Export complete, but no data to generate report for job instance:"));
}
@Test
public void testGroupBulkExportMultipleResourceTypes() {
Patient patient = new Patient();
@ -1398,7 +1477,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
Map<String, List<IBaseResource>> convertJobResultsToResources(BulkExportJobResults theResults) {
Map<String, String> stringStringMap = convertJobResultsToStringContents(theResults);
Map<String, List<IBaseResource>> typeToResources = new HashMap<>();
stringStringMap.entrySet().forEach(entry -> typeToResources.put(entry.getKey(), convertNDJSONToResources(entry.getValue())));
stringStringMap.forEach((key, value) -> typeToResources.put(key, convertNDJSONToResources(value)));
return typeToResources;
}
@ -1412,8 +1491,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
private String getBinaryContentsAsString(String theBinaryId) {
Binary binary = myBinaryDao.read(new IdType(theBinaryId));
assertEquals(Constants.CT_FHIR_NDJSON, binary.getContentType());
String contents = new String(binary.getContent(), Constants.CHARSET_UTF8);
return contents;
return new String(binary.getContent(), Constants.CHARSET_UTF8);
}
BulkExportJobResults startGroupBulkExportJobAndAwaitCompletion(HashSet<String> theResourceTypes, HashSet<String> theFilters, String theGroupId) {
@ -1509,8 +1587,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
await().atMost(300, TimeUnit.SECONDS).until(() -> myJobCoordinator.getInstance(jobInstanceId).getReport() != null);
String report = myJobCoordinator.getInstance(jobInstanceId).getReport();
BulkExportJobResults results = JsonUtil.deserialize(report, BulkExportJobResults.class);
return results;
return JsonUtil.deserialize(report, BulkExportJobResults.class);
}
private void verifyBulkExportResults(String theGroupId, HashSet<String> theFilters, List<String> theContainedList, List<String> theExcludedList) {

View File

@ -502,6 +502,9 @@ public class BulkDataExportProvider {
String serverBase = getServerBase(theRequestDetails);
// an output is required, even if empty, according to HL7 FHIR IG
bulkResponseDocument.getOutput();
for (Map.Entry<String, List<String>> entrySet :
results.getResourceTypeToBinaryIds().entrySet()) {
String resourceType = entrySet.getKey();

View File

@ -75,9 +75,10 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
return;
}
if (stepExecutorOutput.getDataSink().firstStepProducedNothing()) {
if (stepExecutorOutput.getDataSink().firstStepProducedNothing() && !myDefinition.isLastStepReduction()) {
ourLog.info(
"First step of job myInstance {} produced no work chunks, marking as completed and setting end date",
"First step of job myInstance {} produced no work chunks and last step is not a reduction, "
+ "marking as completed and setting end date",
myInstanceId);
myJobPersistence.updateInstance(myInstance.getInstanceId(), instance -> {
instance.setEndTime(new Date());

View File

@ -145,6 +145,11 @@ public class JobDefinition<PT extends IModelJson> {
return myGatedExecution;
}
public boolean isLastStepReduction() {
int stepCount = getSteps().size();
return stepCount >= 1 && getSteps().get(stepCount - 1).isReductionStep();
}
public int getStepIndex(String theStepId) {
int retVal = myStepIds.indexOf(theStepId);
Validate.isTrue(retVal != -1);
@ -304,9 +309,9 @@ public class JobDefinition<PT extends IModelJson> {
throw new ConfigurationException(Msg.code(2106)
+ String.format("Job Definition %s has a reducer step but is not gated", myJobDefinitionId));
}
mySteps.add(new JobDefinitionReductionStep<PT, NIT, OT>(
mySteps.add(new JobDefinitionReductionStep<>(
theStepId, theStepDescription, theStepWorker, myNextInputType, theOutputType));
return new Builder<PT, OT>(
return new Builder<>(
mySteps,
myJobDefinitionId,
myJobDefinitionVersion,

View File

@ -192,12 +192,12 @@ public class InstanceProgress {
/**
* Transitions from IN_PROGRESS/ERRORED based on chunk statuses.
*/
public void calculateNewStatus() {
public void calculateNewStatus(boolean theLastStepIsReduction) {
if (myFailedChunkCount > 0) {
myNewStatus = StatusEnum.FAILED;
} else if (myErroredChunkCount > 0) {
myNewStatus = StatusEnum.ERRORED;
} else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0) {
} else if (myIncompleteChunkCount == 0 && myCompleteChunkCount > 0 && !theLastStepIsReduction) {
myNewStatus = StatusEnum.COMPLETED;
}
}

View File

@ -22,19 +22,26 @@ package ca.uhn.fhir.batch2.progress;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.maintenance.JobChunkProgressAccumulator;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import java.util.Iterator;
import java.util.Optional;
public class JobInstanceProgressCalculator {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IJobPersistence myJobPersistence;
private final JobChunkProgressAccumulator myProgressAccumulator;
private final JobInstanceStatusUpdater myJobInstanceStatusUpdater;
private final JobDefinitionRegistry myJobDefinitionRegistry;
public JobInstanceProgressCalculator(
IJobPersistence theJobPersistence,
@ -42,6 +49,7 @@ public class JobInstanceProgressCalculator {
JobDefinitionRegistry theJobDefinitionRegistry) {
myJobPersistence = theJobPersistence;
myProgressAccumulator = theProgressAccumulator;
myJobDefinitionRegistry = theJobDefinitionRegistry;
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
}
@ -96,8 +104,20 @@ public class JobInstanceProgressCalculator {
}
// wipmb separate status update from stats collection in 6.8
instanceProgress.calculateNewStatus();
instanceProgress.calculateNewStatus(lastStepIsReduction(instanceId));
return instanceProgress;
}
private boolean lastStepIsReduction(String theInstanceId) {
JobInstance jobInstance = getJobInstance(theInstanceId);
JobDefinition<IModelJson> jobDefinition = myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance);
return jobDefinition.isLastStepReduction();
}
private JobInstance getJobInstance(String theInstanceId) {
Optional<JobInstance> oInstance = myJobPersistence.fetchInstance(theInstanceId);
return oInstance.orElseThrow(() ->
new InternalErrorException(Msg.code(2486) + "Failed to fetch JobInstance with id: " + theInstanceId));
}
}

View File

@ -22,6 +22,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collections;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ -90,12 +92,16 @@ public class ReductionStepDataSinkTest {
String data = "data";
StepOutputData stepData = new StepOutputData(data);
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(stepData);
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = mock(JobDefinition.class);
// when
JobInstance instance = JobInstance.fromInstanceId(INSTANCE_ID);
instance.setStatus(StatusEnum.FINALIZE);
stubUpdateInstanceCallback(instance);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition);
// test
myDataSink.accept(chunkData);
@ -111,6 +117,8 @@ public class ReductionStepDataSinkTest {
String data2 = "data2";
WorkChunkData<StepOutputData> firstData = new WorkChunkData<>(new StepOutputData(data));
WorkChunkData<StepOutputData> secondData = new WorkChunkData<>(new StepOutputData(data2));
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = mock(JobDefinition.class);
ourLogger.setLevel(Level.ERROR);
@ -118,6 +126,8 @@ public class ReductionStepDataSinkTest {
instance.setStatus(StatusEnum.FINALIZE);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
stubUpdateInstanceCallback(instance);
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(instance)).thenReturn(jobDefinition);
// test
myDataSink.accept(firstData);
@ -136,10 +146,15 @@ public class ReductionStepDataSinkTest {
@Test
public void accept_noInstanceIdFound_throwsJobExecutionFailed() {
// setup
JobInstance jobInstance = mock(JobInstance.class);
@SuppressWarnings("unchecked")
JobDefinition<IModelJson> jobDefinition = (JobDefinition<IModelJson>) mock(JobDefinition.class);
String data = "data";
WorkChunkData<StepOutputData> chunkData = new WorkChunkData<>(new StepOutputData(data));
when(myJobPersistence.updateInstance(any(), any())).thenReturn(false);
when(myJobPersistence.fetchAllWorkChunksIterator(any(), anyBoolean())).thenReturn(Collections.emptyIterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(jobInstance));
when(myJobDefinitionRegistry.getJobDefinitionOrThrowException(jobInstance)).thenReturn(jobDefinition);
// test
try {
@ -151,5 +166,4 @@ public class ReductionStepDataSinkTest {
fail("Unexpected exception", anyOtherEx);
}
}
}