bulk export permanently reusing cached results (#4249)

* Add test, fix bug, add changelog

* minor refactor
This commit is contained in:
Tadgh 2022-11-04 16:04:03 -07:00 committed by GitHub
parent e6b80d3e3e
commit a29629987d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 17 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 4247
title: "Previously, Bulk Export jobs were always reused, even if completed. Now, jobs are only reused if an identical job is already running, and has not yet completed or failed."

View File

@ -32,6 +32,7 @@ import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
@ -42,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -78,6 +80,29 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@Nested
public class SpecConformanceTests {
@Test
public void testBatchJobsAreOnlyReusedIfInProgress() throws IOException {
//Given a patient exists
Patient p = new Patient();
p.setId("Pat-1");
myClient.update().resource(p).execute();
//And Given we start a bulk export job
String pollingLocation = submitBulkExportForTypes("Patient");
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);
//When we execute another batch job, it should not have the same job id.
String secondPollingLocation = submitBulkExportForTypes("Patient");
String secondJobId = getJobIdFromPollingLocation(secondPollingLocation);
//Then the job id should be different
assertThat(secondJobId, not(equalTo(jobId)));
myBatch2JobHelper.awaitJobCompletion(secondJobId);
}
@Test
public void testPollingLocationContainsAllRequiredAttributesUponCompletion() throws IOException {
@ -87,14 +112,8 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
myClient.update().resource(p).execute();
//And Given we start a bulk export job
HttpGet httpGet = new HttpGet(myClient.getServerBase() + "/$export?_type=Patient");
httpGet.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
String pollingLocation;
try (CloseableHttpResponse status = ourHttpClient.execute(httpGet)) {
Header[] headers = status.getHeaders("Content-Location");
pollingLocation = headers[0].getValue();
}
String jobId = pollingLocation.substring(pollingLocation.indexOf("_jobId=") + 7);
String pollingLocation = submitBulkExportForTypes("Patient");
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);
//Then: When the poll shows as complete, all attributes should be filled.
@ -116,6 +135,11 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
}
}
@NotNull
private String getJobIdFromPollingLocation(String pollingLocation) {
return pollingLocation.substring(pollingLocation.indexOf("_jobId=") + 7);
}
@Test
public void export_shouldExportPatientResource_whenTypeParameterOmitted() throws IOException {
@ -132,7 +156,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
Header[] headers = status.getHeaders("Content-Location");
pollingLocation = headers[0].getValue();
}
String jobId = pollingLocation.substring(pollingLocation.indexOf("_jobId=") + 7);
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);
//Then: When the poll shows as complete, all attributes should be filled.
@ -176,7 +200,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
Header[] headers = status.getHeaders("Content-Location");
pollingLocation = headers[0].getValue();
}
String jobId = pollingLocation.substring(pollingLocation.indexOf("_jobId=") + 7);
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);
HttpGet statusGet = new HttpGet(pollingLocation);
@ -215,7 +239,7 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
Header[] headers = status.getHeaders("Content-Location");
pollingLocation = headers[0].getValue();
}
String jobId = pollingLocation.substring(pollingLocation.indexOf("_jobId=") + 7);
String jobId = getJobIdFromPollingLocation(pollingLocation);
myBatch2JobHelper.awaitJobCompletion(jobId);
HttpGet statusGet = new HttpGet(pollingLocation);
@ -237,6 +261,18 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
}
private String submitBulkExportForTypes(String... theTypes) throws IOException {
String typeString = String.join(",", theTypes);
HttpGet httpGet = new HttpGet(myClient.getServerBase() + "/$export?_type=" + typeString);
httpGet.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
String pollingLocation;
try (CloseableHttpResponse status = ourHttpClient.execute(httpGet)) {
Header[] headers = status.getHeaders("Content-Location");
pollingLocation = headers[0].getValue();
}
return pollingLocation;
}
@Nested
public class SystemBulkExportTests {

View File

@ -39,6 +39,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import org.apache.commons.lang3.Validate;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageHandler;
@ -96,14 +97,9 @@ public class JobCoordinatorImpl implements IJobCoordinator {
if (isBlank(paramsString)) {
throw new InvalidRequestException(Msg.code(2065) + "No parameters supplied");
}
// if cache - use that first
if (theStartRequest.isUseCache()) {
FetchJobInstancesRequest request = new FetchJobInstancesRequest(theStartRequest.getJobDefinitionId(), theStartRequest.getParameters(),
StatusEnum.QUEUED,
StatusEnum.IN_PROGRESS,
StatusEnum.COMPLETED
);
FetchJobInstancesRequest request = new FetchJobInstancesRequest(theStartRequest.getJobDefinitionId(), theStartRequest.getParameters(), getStatesThatTriggerCache());
List<JobInstance> existing = myJobPersistence.fetchInstances(request, 0, 1000);
if (!existing.isEmpty()) {
@ -142,6 +138,13 @@ public class JobCoordinatorImpl implements IJobCoordinator {
return response;
}
/**
* Cache will be used if an identical job is QUEUED or IN_PROGRESS. Otherwise a new one will kickoff.
*/
private StatusEnum[] getStatesThatTriggerCache() {
return new StatusEnum[]{StatusEnum.QUEUED, StatusEnum.IN_PROGRESS};
}
@Override
public JobInstance getInstance(String theInstanceId) {
return myJobQuerySvc.fetchInstance(theInstanceId);