move bundle concurrency config from validationsupport to request validator

This commit is contained in:
Ken Stevens 2021-11-07 13:26:59 -05:00
parent ed9dac5b1a
commit 9eb20cb668
12 changed files with 107 additions and 112 deletions

View File

@ -67,8 +67,6 @@ public class DefaultProfileValidationSupport implements IValidationSupport {
private Map<String, IBaseResource> myValueSets;
private List<String> myTerminologyResources;
private List<String> myStructureDefinitionResources;
private int myBundleValidationThreadCount = IValidationSupport.DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
private boolean myConcurrentBundleValidation;
/**
* Constructor
@ -382,24 +380,4 @@ public class DefaultProfileValidationSupport implements IValidationSupport {
ArrayList<IBaseResource> retVal = new ArrayList<>(theMap.values());
return (List<T>) Collections.unmodifiableList(retVal);
}
@Override
public boolean isConcurrentBundleValidation() {
return myConcurrentBundleValidation;
}
public DefaultProfileValidationSupport setConcurrentBundleValidation(boolean theConcurrentBundleValidation) {
myConcurrentBundleValidation = theConcurrentBundleValidation;
return this;
}
@Override
public int getBundleValidationThreadCount() {
return myBundleValidationThreadCount;
}
public DefaultProfileValidationSupport setBundleValidationThreadCount(int theBundleValidationThreadCount) {
myBundleValidationThreadCount = theBundleValidationThreadCount;
return this;
}
}

View File

@ -76,7 +76,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
*/
public interface IValidationSupport {
String URL_PREFIX_VALUE_SET = "http://hl7.org/fhir/ValueSet/";
public static final int DEFAULT_BUNDLE_VALIDATION_THREADCOUNT = 1;
/**
@ -331,28 +330,6 @@ public interface IValidationSupport {
return null;
}
// FIXME KHS make it clear in the docs that bundle structure is not validated when this is true
/**
* If this is true, bundles will be validated in parallel threads. The bundle structure itself will not be validated,
* only the resources in its entries.
*
* @return
*/
default boolean isConcurrentBundleValidation() {
return false;
}
/**
* @return the number of threads bundle entries will be validated within. This is only used when
* {@link #isConcurrentBundleValidation} is true.
*/
default int getBundleValidationThreadCount() {
return DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
}
enum IssueSeverity {
/**
* The issue caused the action to fail, and no further checking could be performed.

View File

@ -226,7 +226,7 @@ public class FhirValidator {
applyDefaultValidators();
if (theResource instanceof IBaseBundle && myContext.getValidationSupport().isConcurrentBundleValidation()) {
if (theResource instanceof IBaseBundle && theOptions.isConcurrentBundleValidation()) {
return validateBundleEntriesConcurrently((IBaseBundle) theResource, theOptions);
}
@ -236,7 +236,7 @@ public class FhirValidator {
private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) {
List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle);
ExecutorService executorService = getExecutorService();
ExecutorService executorService = getExecutorService(theOptions);
List<Future<ValidationResult>> futures = new ArrayList<>();
for (IBaseResource entry : entries) {
futures.add(executorService.submit(() -> validateResource(entry, theOptions)));
@ -255,9 +255,9 @@ public class FhirValidator {
return new ValidationResult(myContext, validationMessages.stream().collect(Collectors.toList()));
}
private ExecutorService getExecutorService() {
private ExecutorService getExecutorService(ValidationOptions theOptions) {
if (myExecutor == null) {
int size = myContext.getValidationSupport().getBundleValidationThreadCount();
int size = theOptions.getBundleValidationThreadCount();
ourLog.info("Creating FhirValidation thread pool with size {}", size);
myExecutor = Executors.newFixedThreadPool(size);
}
@ -308,7 +308,7 @@ public class FhirValidator {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions);
if (ctx.getResource() instanceof IBaseBundle && myContext.getValidationSupport().isConcurrentBundleValidation()) {
if (ctx.getResource() instanceof IBaseBundle && theOptions.isConcurrentBundleValidation()) {
return validateBundleEntriesConcurrently((IBaseBundle) ctx.getResource(), theOptions);
}

View File

@ -30,9 +30,14 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class ValidationOptions {
public static final int DEFAULT_BUNDLE_VALIDATION_THREADCOUNT = 1;
private static ValidationOptions ourEmpty;
private Set<String> myProfiles;
// FIXME KHS make it clear in the docs that bundle structure is not validated when this is true
private boolean myConcurrentBundleValidation;
private int myBundleValidationThreadCount = DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
public Set<String> getProfiles() {
return myProfiles != null ? Collections.unmodifiableSet(myProfiles) : Collections.emptySet();
}
@ -54,6 +59,41 @@ public class ValidationOptions {
return this;
}
/**
* If this is true, bundles will be validated in parallel threads. The bundle structure itself will not be validated,
* only the resources in its entries.
*/
public boolean isConcurrentBundleValidation() {
return myConcurrentBundleValidation;
}
/**
* If this is true, bundles will be validated in parallel threads. The bundle structure itself will not be validated,
* only the resources in its entries.
*/
public ValidationOptions setConcurrentBundleValidation(boolean theConcurrentBundleValidation) {
myConcurrentBundleValidation = theConcurrentBundleValidation;
return this;
}
/**
* The number of threads bundle entries will be validated within. This is only used when
* {@link #isConcurrentBundleValidation} is true.
*/
public int getBundleValidationThreadCount() {
return myBundleValidationThreadCount;
}
/**
* The number of threads bundle entries will be validated within. This is only used when
* {@link #isConcurrentBundleValidation} is true.
*/
public ValidationOptions setBundleValidationThreadCount(int theBundleValidationThreadCount) {
myBundleValidationThreadCount = theBundleValidationThreadCount;
return this;
}
public static ValidationOptions empty() {
ValidationOptions retVal = ourEmpty;
if (retVal == null) {

View File

@ -67,10 +67,7 @@ public abstract class BaseConfigDstu3Plus extends BaseConfig {
@Bean(name = "myDefaultProfileValidationSupport")
public DefaultProfileValidationSupport defaultProfileValidationSupport(DaoConfig theDaoConfig) {
DefaultProfileValidationSupport retval = new DefaultProfileValidationSupport(fhirContext());
retval.setConcurrentBundleValidation(theDaoConfig.isConcurrentBundleValidation());
retval.setBundleValidationThreadCount(theDaoConfig.getBundleValidationThreadCount());
return retval;
return new DefaultProfileValidationSupport(fhirContext());
}
@Bean(name = JPA_VALIDATION_SUPPORT_CHAIN)

View File

@ -32,6 +32,7 @@ import ca.uhn.fhir.validation.FhirValidator;
import ca.uhn.fhir.validation.IValidatorModule;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import ca.uhn.fhir.validation.SingleValidationMessage;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.text.StrLookup;
@ -75,6 +76,7 @@ public abstract class BaseValidatingInterceptor<T> extends ValidationResultEnric
private List<IValidatorModule> myValidatorModules;
private FhirValidator myValidator;
private final ValidationOptions myValidationOptions = new ValidationOptions();
private void addResponseIssueHeader(RequestDetails theRequestDetails, SingleValidationMessage theNext) {
// Perform any string substitutions from the message format
@ -115,7 +117,7 @@ public abstract class BaseValidatingInterceptor<T> extends ValidationResultEnric
}
abstract ValidationResult doValidate(FhirValidator theValidator, T theRequest);
abstract ValidationResult doValidate(FhirValidator theValidator, T theRequest, ValidationOptions theValidationOptions);
/**
* Fail the request by throwing an {@link UnprocessableEntityException} as a result of a validation failure.
@ -324,7 +326,7 @@ public abstract class BaseValidatingInterceptor<T> extends ValidationResultEnric
ValidationResult validationResult;
try {
validationResult = doValidate(validator, theRequest);
validationResult = doValidate(validator, theRequest, myValidationOptions);
} catch (Exception e) {
if (myIgnoreValidatorExceptions) {
ourLog.warn("Validator threw an exception during validation", e);
@ -390,6 +392,14 @@ public abstract class BaseValidatingInterceptor<T> extends ValidationResultEnric
return validationResult;
}
protected void setConcurrentBundleValidation(boolean theConcurrentBundleValidation) {
myValidationOptions.setConcurrentBundleValidation(theConcurrentBundleValidation);
}
protected void setBundleValidationThreadCount(int theBundleValidationThreadCount) {
myValidationOptions.setBundleValidationThreadCount(theBundleValidationThreadCount);
}
private static class MyLookup extends StrLookup<String> {
private SingleValidationMessage myMessage;

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.method.ResourceParameter;
import ca.uhn.fhir.validation.FhirValidator;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import javax.servlet.http.HttpServletRequest;
@ -53,8 +54,8 @@ public class RequestValidatingInterceptor extends BaseValidatingInterceptor<Stri
private boolean myAddValidationResultsToResponseOperationOutcome = true;
@Override
ValidationResult doValidate(FhirValidator theValidator, String theRequest) {
return theValidator.validateWithResult(theRequest);
ValidationResult doValidate(FhirValidator theValidator, String theRequest, ValidationOptions theValidationOptions) {
return theValidator.validateWithResult(theRequest, theValidationOptions);
}
@Hook(Pointcut.SERVER_INCOMING_REQUEST_POST_PROCESSED)

View File

@ -20,20 +20,20 @@ package ca.uhn.fhir.rest.server.interceptor;
* #L%
*/
import java.util.HashSet;
import java.util.Set;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.validation.FhirValidator;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import java.util.HashSet;
import java.util.Set;
/**
* This interceptor intercepts each outgoing response and if it contains a FHIR resource, validates that resource. The interceptor may be configured to run any validator modules, and will then add
@ -62,8 +62,8 @@ public class ResponseValidatingInterceptor extends BaseValidatingInterceptor<IBa
}
@Override
ValidationResult doValidate(FhirValidator theValidator, IBaseResource theRequest) {
return theValidator.validateWithResult(theRequest);
ValidationResult doValidate(FhirValidator theValidator, IBaseResource theRequest, ValidationOptions theValidationOptions) {
return theValidator.validateWithResult(theRequest, theValidationOptions);
}
@Hook(Pointcut.SERVER_OUTGOING_RESPONSE)

View File

@ -1,12 +1,12 @@
package ca.uhn.fhir.jpa.api.config;
import ca.uhn.fhir.context.support.IValidationSupport;
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.api.model.WarmCacheEntry;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceEncodingEnum;
import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.validation.ValidationOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
@ -280,13 +280,13 @@ public class DaoConfig {
private boolean myAdvancedLuceneIndexing = false;
/**
* @see IValidationSupport#getBundleValidationThreadCount()
* @see ValidationOptions#getBundleValidationThreadCount()
* @since 5.6.0
*/
private int myBundleValidationThreadCount = IValidationSupport.DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
private int myBundleValidationThreadCount = ValidationOptions.DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
/**
* @see IValidationSupport#isConcurrentBundleValidation()
* @see ValidationOptions#isConcurrentBundleValidation()
* @since 5.6.0
*/
private boolean myConcurrentBundleValidation;
@ -2686,7 +2686,7 @@ public class DaoConfig {
}
/**
* @see IValidationSupport#getBundleValidationThreadCount()
* @see ValidationOptions#getBundleValidationThreadCount()
* @since 5.6.0
*/
public int getBundleValidationThreadCount() {
@ -2694,7 +2694,7 @@ public class DaoConfig {
}
/**
* @see IValidationSupport#getBundleValidationThreadCount()
* @see ValidationOptions#getBundleValidationThreadCount()
* @since 5.6.0
*/
public DaoConfig setBundleValidationThreadCount(int theBundleValidationThreadCount) {
@ -2703,7 +2703,7 @@ public class DaoConfig {
}
/**
* @see IValidationSupport#isConcurrentBundleValidation()
* @see ValidationOptions#isConcurrentBundleValidation()
* @since 5.6.0
*/
public boolean isConcurrentBundleValidation() {
@ -2711,7 +2711,7 @@ public class DaoConfig {
}
/**
* @see IValidationSupport#isConcurrentBundleValidation()
* @see ValidationOptions#isConcurrentBundleValidation()
* @since 5.6.0
*/
public DaoConfig setConcurrentBundleValidation(boolean theConcurrentBundleValidation) {

View File

@ -111,14 +111,4 @@ public class BaseValidationSupportWrapper extends BaseValidationSupport {
public TranslateConceptResults translateConcept(TranslateCodeRequest theRequest) {
return myWrap.translateConcept(theRequest);
}
@Override
public boolean isConcurrentBundleValidation() {
return myWrap.isConcurrentBundleValidation();
}
@Override
public int getBundleValidationThreadCount() {
return myWrap.getBundleValidationThreadCount();
}
}

View File

@ -302,14 +302,4 @@ public class ValidationSupportChain implements IValidationSupport {
}
return null;
}
@Override
public boolean isConcurrentBundleValidation() {
for (IValidationSupport next : myChain) {
if (next.isConcurrentBundleValidation()) {
return true;
}
}
return false;
}
}

View File

@ -15,6 +15,7 @@ import ca.uhn.fhir.util.TestUtil;
import ca.uhn.fhir.validation.FhirValidator;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import ca.uhn.fhir.validation.SingleValidationMessage;
import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import com.google.common.base.Charsets;
import org.apache.commons.io.IOUtils;
@ -65,6 +66,7 @@ import org.hl7.fhir.r4.utils.FHIRPathEngine;
import org.hl7.fhir.r5.utils.IResourceValidator;
import org.hl7.fhir.utilities.validation.ValidationMessage;
import org.hl7.fhir.utilities.xhtml.XhtmlNode;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@ -145,6 +147,8 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
myVal = ourCtx.newValidator();
myVal.setValidateAgainstStandardSchema(false);
myVal.setValidateAgainstStandardSchematron(false);
// This is only used if the validation is performed with validationOptions.isConcurrentBundleValidation = true
myVal.setExecutor(Executors.newFixedThreadPool(4));
IValidationSupport mockSupport = mock(IValidationSupport.class);
when(mockSupport.getFhirContext()).thenReturn(ourCtx);
@ -1501,6 +1505,7 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
@Test
public void testValidateBundleMultithreaded() throws IOException {
// setup
StructureDefinition sd = loadStructureDefinition(myDefaultValidationSupport, "/r4/multithread/StructureDefinitionPatientV1.json");
myStructureDefinitionMap.put("https://example.com/StructureDefinition/Patient-v1", sd);
@ -1510,24 +1515,31 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
// We deliberately create an invalid bundle to confirm we are indeed running multithreaded
Bundle bundle = buildBundle(entriesCount, false);
assertThat(bundle.getEntry(), hasSize(entriesCount));
try {
myDefaultValidationSupport.setConcurrentBundleValidation(true);
myVal.setExecutor(Executors.newFixedThreadPool(4));
ValidationOptions validationOptions = buildValidationOptions();
// Run once to exclude initialization from time
myVal.validateWithResult(bundle);
myVal.validateWithResult(bundle, validationOptions);
// execute
StopWatch stopwatch = new StopWatch();
ValidationResult output = myVal.validateWithResult(bundle);
ValidationResult output = myVal.validateWithResult(bundle, validationOptions);
ourLog.info("Validation time: {}", stopwatch);
// validate
List<SingleValidationMessage> all = logResultsAndReturnErrorOnes(output);
assertThat(output.getMessages(), hasSize(entriesCount * 2));
// This assert proves that we did a multi-threaded validation since the outer bundle fails validation
// due to lack of unique fullUrl values on the entries. If you setConcurrentBundleValidation(false)
// above this test will fail.
assertEquals(0, all.size(), all.toString());
} finally {
myVal.setExecutor(null);
myDefaultValidationSupport.setConcurrentBundleValidation(false);
}
@NotNull
private ValidationOptions buildValidationOptions() {
ValidationOptions validationOptions = new ValidationOptions();
validationOptions.setConcurrentBundleValidation(true);
validationOptions.setBundleValidationThreadCount(4);
return validationOptions;
}
private Bundle buildBundle(int theSize, boolean theValidBundle) throws IOException {