Remove bundleValidationThreadCount (it's set by creating an appropriate executor)

This commit is contained in:
Ken Stevens 2021-11-07 15:19:10 -05:00
parent 47454554e3
commit 238027d303
3 changed files with 18 additions and 62 deletions

View File

@ -37,7 +37,6 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -58,7 +57,6 @@ public class FhirValidator {
private static final Logger ourLog = LoggerFactory.getLogger(FhirValidator.class); private static final Logger ourLog = LoggerFactory.getLogger(FhirValidator.class);
private static final String I18N_KEY_NO_PH_ERROR = FhirValidator.class.getName() + ".noPhError"; private static final String I18N_KEY_NO_PH_ERROR = FhirValidator.class.getName() + ".noPhError";
public static final int DEFAULT_BUNDLE_VALIDATION_THREADCOUNT = 1;
private static volatile Boolean ourPhPresentOnClasspath; private static volatile Boolean ourPhPresentOnClasspath;
private final FhirContext myContext; private final FhirContext myContext;
@ -66,9 +64,8 @@ public class FhirValidator {
private IInterceptorBroadcaster myInterceptorBraodcaster; private IInterceptorBroadcaster myInterceptorBraodcaster;
// FIXME KHS make it clear in the docs that bundle structure is not validated when this is true // FIXME KHS make it clear in the docs that bundle structure is not validated when this is true
private boolean myConcurrentBundleValidation; private boolean myConcurrentBundleValidation;
private int myBundleValidationThreadCount = DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
private ExecutorService myExecutor; private ExecutorService myExecutorService;
/** /**
* Constructor (this should not be called directly, but rather {@link FhirContext#newValidator()} should be called to obtain an instance of {@link FhirValidator}) * Constructor (this should not be called directly, but rather {@link FhirContext#newValidator()} should be called to obtain an instance of {@link FhirValidator})
@ -232,7 +229,11 @@ public class FhirValidator {
applyDefaultValidators(); applyDefaultValidators();
if (theResource instanceof IBaseBundle && myConcurrentBundleValidation) { if (theResource instanceof IBaseBundle && myConcurrentBundleValidation) {
return validateBundleEntriesConcurrently((IBaseBundle) theResource, theOptions); if (myExecutorService != null) {
return validateBundleEntriesConcurrently((IBaseBundle) theResource, theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
}
} }
return validateResource(theResource, theOptions); return validateResource(theResource, theOptions);
@ -241,10 +242,9 @@ public class FhirValidator {
private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) { private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) {
List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle); List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle);
ExecutorService executorService = getExecutorService();
List<Future<ValidationResult>> futures = new ArrayList<>(); List<Future<ValidationResult>> futures = new ArrayList<>();
for (IBaseResource entry : entries) { for (IBaseResource entry : entries) {
futures.add(executorService.submit(() -> validateResource(entry, theOptions))); futures.add(myExecutorService.submit(() -> validateResource(entry, theOptions)));
} }
List<SingleValidationMessage> validationMessages = new ArrayList<>(); List<SingleValidationMessage> validationMessages = new ArrayList<>();
@ -260,15 +260,6 @@ public class FhirValidator {
return new ValidationResult(myContext, validationMessages.stream().collect(Collectors.toList())); return new ValidationResult(myContext, validationMessages.stream().collect(Collectors.toList()));
} }
private ExecutorService getExecutorService() {
if (myExecutor == null) {
int size = myBundleValidationThreadCount;
ourLog.info("Creating FhirValidation thread pool with size {}", size);
myExecutor = Executors.newFixedThreadPool(size);
}
return myExecutor;
}
private ValidationResult validateResource(IBaseResource theResource, ValidationOptions theOptions) { private ValidationResult validateResource(IBaseResource theResource, ValidationOptions theOptions) {
IValidationContext<IBaseResource> ctx = ValidationContext.forResource(myContext, theResource, theOptions); IValidationContext<IBaseResource> ctx = ValidationContext.forResource(myContext, theResource, theOptions);
@ -314,7 +305,11 @@ public class FhirValidator {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions); IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions);
if (ctx.getResource() instanceof IBaseBundle && myConcurrentBundleValidation) { if (ctx.getResource() instanceof IBaseBundle && myConcurrentBundleValidation) {
return validateBundleEntriesConcurrently((IBaseBundle) ctx.getResource(), theOptions); if (myExecutorService != null) {
return validateBundleEntriesConcurrently((IBaseBundle) ctx.getResource(), theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
}
} }
for (IValidatorModule next : myValidators) { for (IValidatorModule next : myValidators) {
@ -336,8 +331,8 @@ public class FhirValidator {
} }
// FIXME KHS use this to set an executor that uses ThreadPoolUtil#newThreadPool // FIXME KHS use this to set an executor that uses ThreadPoolUtil#newThreadPool
public FhirValidator setExecutor(ExecutorService theExecutor) { public FhirValidator setExecutorService(ExecutorService theExecutorService) {
myExecutor = theExecutor; myExecutorService = theExecutorService;
return this; return this;
} }
@ -358,21 +353,4 @@ public class FhirValidator {
myConcurrentBundleValidation = theConcurrentBundleValidation; myConcurrentBundleValidation = theConcurrentBundleValidation;
return this; return this;
} }
/**
* The number of threads bundle entries will be validated within. This is only used when
* {@link #isConcurrentBundleValidation} is true.
*/
public int getBundleValidationThreadCount() {
return myBundleValidationThreadCount;
}
/**
* The number of threads bundle entries will be validated within. This is only used when
* {@link #isConcurrentBundleValidation} is true.
*/
public FhirValidator setBundleValidationThreadCount(int theBundleValidationThreadCount) {
myBundleValidationThreadCount = theBundleValidationThreadCount;
return this;
}
} }

View File

@ -279,12 +279,6 @@ public class DaoConfig {
*/ */
private boolean myAdvancedLuceneIndexing = false; private boolean myAdvancedLuceneIndexing = false;
/**
* @see FhirValidator#getBundleValidationThreadCount()
* @since 5.6.0
*/
private int myBundleValidationThreadCount = FhirValidator.DEFAULT_BUNDLE_VALIDATION_THREADCOUNT;
/** /**
* @see FhirValidator#isConcurrentBundleValidation() * @see FhirValidator#isConcurrentBundleValidation()
* @since 5.6.0 * @since 5.6.0
@ -2685,23 +2679,6 @@ public class DaoConfig {
myElasicSearchIndexPrefix = thePrefix; myElasicSearchIndexPrefix = thePrefix;
} }
/**
* @see FhirValidator#getBundleValidationThreadCount()
* @since 5.6.0
*/
public int getBundleValidationThreadCount() {
return myBundleValidationThreadCount;
}
/**
* @see FhirValidator#getBundleValidationThreadCount()
* @since 5.6.0
*/
public DaoConfig setBundleValidationThreadCount(int theBundleValidationThreadCount) {
myBundleValidationThreadCount = theBundleValidationThreadCount;
return this;
}
/** /**
* @see FhirValidator#isConcurrentBundleValidation() * @see FhirValidator#isConcurrentBundleValidation()
* @since 5.6.0 * @since 5.6.0

View File

@ -146,7 +146,7 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
myFhirValidator.setValidateAgainstStandardSchema(false); myFhirValidator.setValidateAgainstStandardSchema(false);
myFhirValidator.setValidateAgainstStandardSchematron(false); myFhirValidator.setValidateAgainstStandardSchematron(false);
// This is only used if the validation is performed with validationOptions.isConcurrentBundleValidation = true // This is only used if the validation is performed with validationOptions.isConcurrentBundleValidation = true
myFhirValidator.setExecutor(Executors.newFixedThreadPool(4)); myFhirValidator.setExecutorService(Executors.newFixedThreadPool(4));
IValidationSupport mockSupport = mock(IValidationSupport.class); IValidationSupport mockSupport = mock(IValidationSupport.class);
when(mockSupport.getFhirContext()).thenReturn(ourCtx); when(mockSupport.getFhirContext()).thenReturn(ourCtx);
@ -1514,8 +1514,9 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
assertThat(bundle.getEntry(), hasSize(entriesCount)); assertThat(bundle.getEntry(), hasSize(entriesCount));
try { try {
// RED-GREEN set ConcurrentBundleValidation to false to see the test fail
myFhirValidator.setConcurrentBundleValidation(true); myFhirValidator.setConcurrentBundleValidation(true);
myFhirValidator.setBundleValidationThreadCount(4); myFhirValidator.setExecutorService(Executors.newFixedThreadPool(4));
// Run once to exclude initialization from time // Run once to exclude initialization from time
myFhirValidator.validateWithResult(bundle); myFhirValidator.validateWithResult(bundle);
@ -1532,7 +1533,7 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
assertEquals(0, all.size(), all.toString()); assertEquals(0, all.size(), all.toString());
} finally { } finally {
myFhirValidator.setConcurrentBundleValidation(false); myFhirValidator.setConcurrentBundleValidation(false);
myFhirValidator.setBundleValidationThreadCount(FhirValidator.DEFAULT_BUNDLE_VALIDATION_THREADCOUNT); myFhirValidator.setExecutorService(null);
} }
} }