disable batch job reuse (#4155)
This commit is contained in:
parent
a6ed393969
commit
5a31ce0386
|
@ -0,0 +1,7 @@
|
||||||
|
---
|
||||||
|
type: add
|
||||||
|
issue: 4154
|
||||||
|
jira: SMILE-5389
|
||||||
|
title: "By default, if the `$export` operation receives a request that is identical to one that has been recently
|
||||||
|
processed, it will attempt to reuse the batch job from the former request. A new configuration parameter has been
|
||||||
|
introduced that disables this behavior and forces a new batch job on every call."
|
|
@ -1,6 +1,7 @@
|
||||||
package ca.uhn.fhir.jpa.bulk;
|
package ca.uhn.fhir.jpa.bulk;
|
||||||
|
|
||||||
import ca.uhn.fhir.context.FhirContext;
|
import ca.uhn.fhir.context.FhirContext;
|
||||||
|
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
|
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
|
||||||
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
|
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
|
||||||
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
|
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
|
||||||
|
@ -12,7 +13,6 @@ import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
|
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportResponseJson;
|
||||||
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
|
import ca.uhn.fhir.jpa.bulk.export.provider.BulkDataExportProvider;
|
||||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||||
import ca.uhn.fhir.rest.annotation.Validate;
|
|
||||||
import ca.uhn.fhir.rest.api.Constants;
|
import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
|
import ca.uhn.fhir.rest.client.apache.ResourceEntity;
|
||||||
import ca.uhn.fhir.rest.server.HardcodedServerAddressStrategy;
|
import ca.uhn.fhir.rest.server.HardcodedServerAddressStrategy;
|
||||||
|
@ -44,7 +44,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.Arguments;
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.InjectMocks;
|
import org.mockito.InjectMocks;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
|
@ -93,6 +92,8 @@ public class BulkDataExportProviderTest {
|
||||||
@Mock
|
@Mock
|
||||||
private IBatch2JobRunner myJobRunner;
|
private IBatch2JobRunner myJobRunner;
|
||||||
|
|
||||||
|
private DaoConfig myDaoConfig;
|
||||||
|
|
||||||
private CloseableHttpClient myClient;
|
private CloseableHttpClient myClient;
|
||||||
|
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
|
@ -131,6 +132,12 @@ public class BulkDataExportProviderTest {
|
||||||
myClient = builder.build();
|
myClient = builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void injectDaoConfig() {
|
||||||
|
myDaoConfig = new DaoConfig();
|
||||||
|
myProvider.setDaoConfig(myDaoConfig);
|
||||||
|
}
|
||||||
|
|
||||||
public void startWithFixedBaseUrl() {
|
public void startWithFixedBaseUrl() {
|
||||||
String baseUrl = "http://localhost:" + myPort + "/fixedvalue";
|
String baseUrl = "http://localhost:" + myPort + "/fixedvalue";
|
||||||
HardcodedServerAddressStrategy hardcodedServerAddressStrategy = new HardcodedServerAddressStrategy(baseUrl);
|
HardcodedServerAddressStrategy hardcodedServerAddressStrategy = new HardcodedServerAddressStrategy(baseUrl);
|
||||||
|
@ -737,6 +744,40 @@ public class BulkDataExportProviderTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProvider_whenEnableBatchJobReuseIsFalse_startsNewJob() throws IOException {
|
||||||
|
// setup
|
||||||
|
Batch2JobStartResponse startResponse = createJobStartResponse();
|
||||||
|
startResponse.setUsesCachedResult(true);
|
||||||
|
|
||||||
|
myDaoConfig.setEnableBulkExportJobReuse(false);
|
||||||
|
|
||||||
|
// when
|
||||||
|
when(myJobRunner.startNewJob(any(Batch2BaseJobParameters.class)))
|
||||||
|
.thenReturn(startResponse);
|
||||||
|
|
||||||
|
Parameters input = new Parameters();
|
||||||
|
input.addParameter(JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT, new StringType(Constants.CT_FHIR_NDJSON));
|
||||||
|
input.addParameter(JpaConstants.PARAM_EXPORT_TYPE, new StringType("Patient, Practitioner"));
|
||||||
|
|
||||||
|
// call
|
||||||
|
HttpPost post = new HttpPost("http://localhost:" + myPort + "/" + JpaConstants.OPERATION_EXPORT);
|
||||||
|
post.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
|
||||||
|
post.setEntity(new ResourceEntity(myCtx, input));
|
||||||
|
ourLog.info("Request: {}", post);
|
||||||
|
try (CloseableHttpResponse response = myClient.execute(post)) {
|
||||||
|
ourLog.info("Response: {}", response.toString());
|
||||||
|
assertEquals(202, response.getStatusLine().getStatusCode());
|
||||||
|
assertEquals("Accepted", response.getStatusLine().getReasonPhrase());
|
||||||
|
assertEquals("http://localhost:" + myPort + "/$export-poll-status?_jobId=" + A_JOB_ID, response.getFirstHeader(Constants.HEADER_CONTENT_LOCATION).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify
|
||||||
|
BulkExportParameters parameters = verifyJobStart();
|
||||||
|
assertThat(parameters.isUseExistingJobsFirst(), is(equalTo(false)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProviderReturnsSameIdForSameJob() throws IOException {
|
public void testProviderReturnsSameIdForSameJob() throws IOException {
|
||||||
// given
|
// given
|
||||||
|
|
|
@ -314,6 +314,11 @@ public class DaoConfig {
|
||||||
*/
|
*/
|
||||||
private int myBulkExportFileRetentionPeriodHours = 2;
|
private int myBulkExportFileRetentionPeriodHours = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since 6.2.0
|
||||||
|
*/
|
||||||
|
private boolean myEnableBulkExportJobReuse = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Since 6.1.0
|
* Since 6.1.0
|
||||||
*/
|
*/
|
||||||
|
@ -2901,6 +2906,22 @@ public class DaoConfig {
|
||||||
myBulkExportFileRetentionPeriodHours = theBulkExportFileRetentionPeriodHours;
|
myBulkExportFileRetentionPeriodHours = theBulkExportFileRetentionPeriodHours;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This setting controls whether, upon receiving a request for an $export operation, if a batch job already exists
|
||||||
|
* that exactly matches the new request, the system should attempt to reuse the batch job. Default is true.
|
||||||
|
*/
|
||||||
|
public boolean getEnableBulkExportJobReuse() {
|
||||||
|
return myEnableBulkExportJobReuse;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This setting controls whether, upon receiving a request for an $export operation, if a batch job already exists
|
||||||
|
* that exactly matches the new request, the system should attempt to reuse the batch job. Default is true.
|
||||||
|
*/
|
||||||
|
public void setEnableBulkExportJobReuse(boolean theEnableBulkExportJobReuse) {
|
||||||
|
myEnableBulkExportJobReuse = theEnableBulkExportJobReuse;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This setting indicates whether updating the history of a resource is allowed.
|
* This setting indicates whether updating the history of a resource is allowed.
|
||||||
* Default is false.
|
* Default is false.
|
||||||
|
|
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.i18n.Msg;
|
||||||
import ca.uhn.fhir.interceptor.api.HookParams;
|
import ca.uhn.fhir.interceptor.api.HookParams;
|
||||||
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
||||||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||||
|
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
|
import ca.uhn.fhir.jpa.api.model.Batch2JobInfo;
|
||||||
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
|
import ca.uhn.fhir.jpa.api.model.Batch2JobOperationResult;
|
||||||
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
|
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
|
||||||
|
@ -55,6 +56,7 @@ import ca.uhn.fhir.util.JsonUtil;
|
||||||
import ca.uhn.fhir.util.OperationOutcomeUtil;
|
import ca.uhn.fhir.util.OperationOutcomeUtil;
|
||||||
import ca.uhn.fhir.util.SearchParameterUtil;
|
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||||
import ca.uhn.fhir.util.UrlUtil;
|
import ca.uhn.fhir.util.UrlUtil;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
|
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
@ -95,6 +97,9 @@ public class BulkDataExportProvider {
|
||||||
@Autowired
|
@Autowired
|
||||||
private IBatch2JobRunner myJobRunner;
|
private IBatch2JobRunner myJobRunner;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DaoConfig myDaoConfig;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* $export
|
* $export
|
||||||
*/
|
*/
|
||||||
|
@ -147,7 +152,7 @@ public class BulkDataExportProvider {
|
||||||
|
|
||||||
private boolean shouldUseCache(ServletRequestDetails theRequestDetails) {
|
private boolean shouldUseCache(ServletRequestDetails theRequestDetails) {
|
||||||
CacheControlDirective cacheControlDirective = new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL));
|
CacheControlDirective cacheControlDirective = new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL));
|
||||||
return !cacheControlDirective.isNoCache();
|
return myDaoConfig.getEnableBulkExportJobReuse() && !cacheControlDirective.isNoCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getServerBase(ServletRequestDetails theRequestDetails) {
|
private String getServerBase(ServletRequestDetails theRequestDetails) {
|
||||||
|
@ -476,4 +481,9 @@ public class BulkDataExportProvider {
|
||||||
throw new InvalidRequestException(Msg.code(513) + "Must request async processing for " + theOperationName);
|
throw new InvalidRequestException(Msg.code(513) + "Must request async processing for " + theOperationName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setDaoConfig(DaoConfig theDaoConfig) {
|
||||||
|
myDaoConfig = theDaoConfig;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue