Merge branch 'master' into ng_validation_s13n_interceptors

This commit is contained in:
Nick Goupinets 2021-03-15 10:32:04 -04:00
commit 744adb2d08
8 changed files with 184 additions and 25 deletions

View File

@ -185,7 +185,9 @@ public abstract class BaseApp {
loggingConfigOff();
validateJavaVersion();
AnsiConsole.systemInstall();
if (System.getProperty("unit_test") != null) {
AnsiConsole.systemInstall();
}
// log version while the logging is off
VersionUtil.getVersion();

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 2471
title: "An NPE was fixed when performing highly concurrent system requests while using the ResourceVersionConflictResolutionStrategy interceptor."

View File

@ -132,12 +132,14 @@ public class BulkDataExportProvider {
}
private void validateResourceTypesAllContainPatientSearchParams(Set<String> theResourceTypes) {
List<String> badResourceTypes = theResourceTypes.stream()
.filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType))
.collect(Collectors.toList());
if (theResourceTypes != null) {
List<String> badResourceTypes = theResourceTypes.stream()
.filter(resourceType -> !myBulkDataExportSvc.getPatientCompartmentResources().contains(resourceType))
.collect(Collectors.toList());
if (!badResourceTypes.isEmpty()) {
throw new InvalidRequestException(String.format("Resource types [%s] are invalid for this type of export, as they do not contain search parameters that refer to patients.", String.join(",", badResourceTypes)));
if (!badResourceTypes.isEmpty()) {
throw new InvalidRequestException(String.format("Resource types [%s] are invalid for this type of export, as they do not contain search parameters that refer to patients.", String.join(",", badResourceTypes)));
}
}
}

View File

@ -24,8 +24,11 @@ import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.apache.commons.lang3.Validate;
import java.util.List;
import java.util.StringTokenizer;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -36,8 +39,8 @@ import static org.apache.commons.lang3.StringUtils.trim;
* if present, it will instruct the server to automatically retry JPA server operations that would have
* otherwise failed with a {@link ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException} (HTTP 409).
* <p>
* The format of the header is:<br/>
* <code>X-Retry-On-Version-Conflict: retry; max-retries=100</code>
* The format of the header is:<br/>
* <code>X-Retry-On-Version-Conflict: retry; max-retries=100</code>
* </p>
*/
@Interceptor
@ -52,25 +55,28 @@ public class UserRequestRetryVersionConflictsInterceptor {
ResourceVersionConflictResolutionStrategy retVal = new ResourceVersionConflictResolutionStrategy();
if (theRequestDetails != null) {
for (String headerValue : theRequestDetails.getHeaders(HEADER_NAME)) {
if (isNotBlank(headerValue)) {
List<String> headers = theRequestDetails.getHeaders(HEADER_NAME);
if (headers != null) {
for (String headerValue : headers) {
if (isNotBlank(headerValue)) {
StringTokenizer tok = new StringTokenizer(headerValue, ";");
while (tok.hasMoreTokens()) {
String next = trim(tok.nextToken());
if (next.equals(RETRY)) {
retVal.setRetry(true);
} else if (next.startsWith(MAX_RETRIES + "=")) {
StringTokenizer tok = new StringTokenizer(headerValue, ";");
while (tok.hasMoreTokens()) {
String next = trim(tok.nextToken());
if (next.equals(RETRY)) {
retVal.setRetry(true);
} else if (next.startsWith(MAX_RETRIES + "=")) {
String val = trim(next.substring((MAX_RETRIES + "=").length()));
int maxRetries = Integer.parseInt(val);
maxRetries = Math.min(100, maxRetries);
retVal.setMaxRetries(maxRetries);
String val = trim(next.substring((MAX_RETRIES + "=").length()));
int maxRetries = Integer.parseInt(val);
maxRetries = Math.min(100, maxRetries);
retVal.setMaxRetries(maxRetries);
}
}
}
}
}
}
@ -79,4 +85,12 @@ public class UserRequestRetryVersionConflictsInterceptor {
}
/**
* Convenience method to add a retry header to a system request
*/
public static void addRetryHeader(SystemRequestDetails theRequestDetails, int theMaxRetries) {
Validate.inclusiveBetween(1, Integer.MAX_VALUE, theMaxRetries, "Max retries must be > 0");
String value = RETRY + "; " + MAX_RETRIES + "=" + theMaxRetries;
theRequestDetails.addHeader(HEADER_NAME, value);
}
}

View File

@ -33,12 +33,19 @@ import ca.uhn.fhir.rest.server.ElementsSupportEnum;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.IRestfulServerDefaults;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
/**
* A default RequestDetails implementation that can be used for system calls to
@ -51,6 +58,8 @@ public class SystemRequestDetails extends RequestDetails {
super(new MyInterceptorBroadcaster());
}
private ListMultimap<String, String> myHeaders;
public SystemRequestDetails(IInterceptorBroadcaster theInterceptorBroadcaster) {
super(theInterceptorBroadcaster);
}
@ -72,14 +81,31 @@ public class SystemRequestDetails extends RequestDetails {
@Override
public String getHeader(String name) {
return null;
List<String> headers = getHeaders(name);
if (headers.isEmpty()) {
return null;
} else {
return headers.get(0);
}
}
@Override
public List<String> getHeaders(String name) {
return null;
ListMultimap<String, String> headers = myHeaders;
if (headers == null) {
headers = ImmutableListMultimap.of();
}
return headers.get(name);
}
public void addHeader(String theName, String theValue) {
if (myHeaders == null) {
myHeaders = ArrayListMultimap.create();
}
myHeaders.put(theName, theValue);
}
@Override
public Object getAttribute(String theAttributeName) {
return null;

View File

@ -148,6 +148,7 @@ public class BulkDataExportProviderTest {
assertThat(options.getFilters(), containsInAnyOrder("Patient?identifier=foo"));
}
@Test
public void testSuccessfulInitiateBulkRequest_Get() throws IOException {
@ -432,6 +433,22 @@ public class BulkDataExportProviderTest {
assertThat(responseBody, is(containsString("Resource types [StructureDefinition] are invalid for this type of export, as they do not contain search parameters that refer to patients.")));
}
@Test
public void testInitiateGroupExportWithNoResourceTypes() throws IOException {
IBulkDataExportSvc.JobInfo jobInfo = new IBulkDataExportSvc.JobInfo()
.setJobId(A_JOB_ID);
when(myBulkDataExportSvc.submitJob(any(), any())).thenReturn(jobInfo);
String url = "http://localhost:" + myPort + "/" + "Group/123/" +JpaConstants.OPERATION_EXPORT
+ "?" + JpaConstants.PARAM_EXPORT_OUTPUT_FORMAT + "=" + UrlUtil.escapeUrlParam(Constants.CT_FHIR_NDJSON);;
HttpGet get = new HttpGet(url);
get.addHeader(Constants.HEADER_PREFER, Constants.HEADER_PREFER_RESPOND_ASYNC);
CloseableHttpResponse execute = myClient.execute(get);
assertThat(execute.getStatusLine().getStatusCode(), is(equalTo(202)));
}
@Test
public void testInitiateWithPostAndMultipleTypeFilters() throws IOException {

View File

@ -19,9 +19,11 @@ import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
@ -37,6 +39,7 @@ import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.Immunization;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
@ -52,6 +55,7 @@ import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
@ -404,6 +408,31 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
}
@Test
public void testGroupExport_NoResourceTypesSpecified() {
createResources();
// Create a bulk job
BulkDataExportOptions bulkDataExportOptions = new BulkDataExportOptions();
bulkDataExportOptions.setOutputFormat(null);
bulkDataExportOptions.setSince(null);
bulkDataExportOptions.setFilters(null);
bulkDataExportOptions.setGroupId(myPatientGroupId);
bulkDataExportOptions.setExpandMdm(true);
bulkDataExportOptions.setExportStyle(BulkDataExportOptions.ExportStyle.GROUP);
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(bulkDataExportOptions);
myBulkDataExportSvc.buildExportFiles();
awaitAllBulkJobCompletions();
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(5));
}
@Test
public void testGenerateBulkExport_WithHas() {

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.interceptor.UserRequestRetryVersionConflictsInterceptor;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.PatchTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
@ -17,7 +18,6 @@ import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.IntegerType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
@ -105,6 +105,72 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
}
@Test
public void testCreateWithClientAssignedId_SystemRequest() throws InterruptedException {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> myPatientDao.update(p, new SystemRequestDetails());
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Look for failures
for (Future<?> next : futures) {
try {
next.get();
ourLog.info("Future produced success");
} catch (ExecutionException e) {
ourLog.info("Future produced exception: {}", e.toString());
assertEquals(ResourceVersionConflictException.class, e.getCause().getClass());
}
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testCreateWithClientAssignedId_SystemRequestContainingRetryDirective() throws InterruptedException, ExecutionException {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
SystemRequestDetails requestDetails = new SystemRequestDetails();
UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 10);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Patient p = new Patient();
p.setId("ABC");
p.setActive(true);
p.addIdentifier().setValue("VAL" + i);
Runnable task = () -> {
myPatientDao.update(p, requestDetails);
};
Future<?> future = myExecutor.submit(task);
futures.add(future);
}
// Should not fail
for (Future<?> next : futures) {
next.get();
ourLog.info("Future produced success");
}
// Make sure we saved the object
Patient patient = myPatientDao.read(new IdType("Patient/ABC"));
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(patient));
assertEquals(true, patient.getActive());
}
@Test
public void testCreateWithUniqueConstraint() {
SearchParameter sp = new SearchParameter();
@ -389,7 +455,6 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
}
@Test
public void testTransactionWithCreate() {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);