Avoid deadlock in JPA server when creating a large number of resources

and using the RequestValidatingInterceptor
This commit is contained in:
James Agnew 2017-07-04 21:53:26 -04:00
parent 294d080bd3
commit c1d06084b7
6 changed files with 196 additions and 6 deletions

View File

@ -1,5 +1,8 @@
package ca.uhn.fhir.jpa.dao;
import javax.transaction.Transactional;
import javax.transaction.Transactional.TxType;
/*
* #%L
* HAPI FHIR JPA Server
@ -31,6 +34,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.IBundleProvider;
@Transactional(value=TxType.REQUIRED)
public class JpaValidationSupportDstu2 implements IJpaValidationSupportDstu2 {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(JpaValidationSupportDstu2.class);
@ -52,11 +56,13 @@ public class JpaValidationSupportDstu2 implements IJpaValidationSupportDstu2 {
private FhirContext myDstu2Ctx;
@Override
@Transactional(value=TxType.SUPPORTS)
public ValueSetExpansionComponent expandValueSet(FhirContext theCtx, ConceptSetComponent theInclude) {
return null;
}
@Override
@Transactional(value=TxType.SUPPORTS)
public ValueSet fetchCodeSystem(FhirContext theCtx, String theSystem) {
return null;
}
@ -95,11 +101,13 @@ public class JpaValidationSupportDstu2 implements IJpaValidationSupportDstu2 {
}
@Override
@Transactional(value=TxType.SUPPORTS)
public boolean isCodeSystemSupported(FhirContext theCtx, String theSystem) {
return false;
}
@Override
@Transactional(value=TxType.SUPPORTS)
public CodeValidationResult validateCode(FhirContext theCtx, String theCodeSystem, String theCode, String theDisplay) {
return null;
}

View File

@ -3,11 +3,10 @@ package ca.uhn.fhir.jpa.dao.dstu3;
import java.util.Collections;
import java.util.List;
import org.hl7.fhir.dstu3.model.CodeSystem;
import org.hl7.fhir.dstu3.model.IdType;
import org.hl7.fhir.dstu3.model.Questionnaire;
import org.hl7.fhir.dstu3.model.StructureDefinition;
import org.hl7.fhir.dstu3.model.ValueSet;
import javax.transaction.Transactional;
import javax.transaction.Transactional.TxType;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.ValueSet.ConceptSetComponent;
import org.hl7.fhir.dstu3.model.ValueSet.ValueSetExpansionComponent;
import org.hl7.fhir.instance.model.api.IAnyResource;
@ -43,6 +42,7 @@ import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.param.UriParam;
import ca.uhn.fhir.rest.server.IBundleProvider;
@Transactional(value=TxType.REQUIRED)
public class JpaValidationSupportDstu3 implements IJpaValidationSupportDstu3 {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(JpaValidationSupportDstu3.class);
@ -72,6 +72,7 @@ public class JpaValidationSupportDstu3 implements IJpaValidationSupportDstu3 {
@Override
@Transactional(value=TxType.SUPPORTS)
public ValueSetExpansionComponent expandValueSet(FhirContext theCtx, ConceptSetComponent theInclude) {
return null;
}
@ -81,6 +82,7 @@ public class JpaValidationSupportDstu3 implements IJpaValidationSupportDstu3 {
return fetchResource(theCtx, CodeSystem.class, theSystem);
}
@SuppressWarnings("unchecked")
@Override
public <T extends IBaseResource> T fetchResource(FhirContext theContext, Class<T> theClass, String theUri) {
IdType id = new IdType(theUri);
@ -143,11 +145,13 @@ public class JpaValidationSupportDstu3 implements IJpaValidationSupportDstu3 {
}
@Override
@Transactional(value=TxType.SUPPORTS)
public boolean isCodeSystemSupported(FhirContext theCtx, String theSystem) {
return false;
}
@Override
@Transactional(value=TxType.SUPPORTS)
public CodeValidationResult validateCode(FhirContext theCtx, String theCodeSystem, String theCode, String theDisplay) {
return null;
}
@ -160,6 +164,7 @@ public class JpaValidationSupportDstu3 implements IJpaValidationSupportDstu3 {
@Override
@Transactional(value=TxType.SUPPORTS)
public List<StructureDefinition> fetchAllStructureDefinitions(FhirContext theContext) {
return Collections.emptyList();
}

View File

@ -37,6 +37,7 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
retVal.setUrl("jdbc:derby:memory:myUnitTestDB;create=true");
retVal.setUsername("");
retVal.setPassword("");
retVal.setMaxTotal(4);
DataSource dataSource = ProxyDataSourceBuilder
.create(retVal)

View File

@ -31,6 +31,7 @@ import ca.uhn.fhir.jpa.config.dstu3.WebsocketDstu3Config;
import ca.uhn.fhir.jpa.config.dstu3.WebsocketDstu3DispatcherConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.dao.dstu3.SearchParamRegistryDstu3;
import ca.uhn.fhir.jpa.interceptor.RestHookSubscriptionDstu3Interceptor;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
@ -47,7 +48,7 @@ import ca.uhn.fhir.util.TestUtil;
public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
private static JpaValidationSupportChainDstu3 myValidationSupport;
protected static JpaValidationSupportChainDstu3 myValidationSupport;
protected static IGenericClient ourClient;
protected static CloseableHttpClient ourHttpClient;
protected static int ourPort;
@ -56,6 +57,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
protected static String ourServerBase;
private static GenericWebApplicationContext ourWebApplicationContext;
private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider;
protected static SearchParamRegistryDstu3 ourSearchParamRegistry;
protected static DatabaseBackedPagingProvider ourPagingProvider;
protected static RestHookSubscriptionDstu3Interceptor ourRestHookSubscriptionInterceptor;
protected static ISearchDao mySearchEntityDao;
@ -150,7 +152,9 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
mySearchEntityDao = wac.getBean(ISearchDao.class);
ourRestHookSubscriptionInterceptor = wac.getBean(RestHookSubscriptionDstu3Interceptor.class);
ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class);
myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000);
ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase);
if (shouldLogClient()) {
ourClient.registerInterceptor(new LoggingInterceptor(true));
@ -159,6 +163,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(5000, TimeUnit.MILLISECONDS);
HttpClientBuilder builder = HttpClientBuilder.create();
builder.setConnectionManager(connectionManager);
builder.setMaxConnPerRoute(99);
ourHttpClient = builder.build();
ourServer = server;

View File

@ -0,0 +1,166 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.hl7.fhir.dstu3.hapi.validation.FhirInstanceValidator;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.dstu3.model.Bundle.BundleType;
import org.hl7.fhir.dstu3.model.Bundle.HTTPVerb;
import org.junit.*;
import com.google.common.collect.Lists;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.util.TestUtil;
public class StressTestDstu3Test extends BaseResourceProviderDstu3Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(StressTestDstu3Test.class);
private RequestValidatingInterceptor myRequestValidatingInterceptor;
@Before
public void before() throws Exception {
super.before();
myRequestValidatingInterceptor = new RequestValidatingInterceptor();
FhirInstanceValidator module = new FhirInstanceValidator();
module.setValidationSupport(myValidationSupport);
myRequestValidatingInterceptor.addValidatorModule(module);
}
@After
public void after() throws Exception {
super.after();
ourRestServer.unregisterInterceptor(myRequestValidatingInterceptor);
}
/**
* This test prevents a deadlock that was detected with a large number of
* threads creating resources and blocking on the searchparamcache refreshing
* (since this is a synchronized method) while the instance that was actually
* executing was waiting on a DB connection. This was solved by making
* JpaValidationSupportDstuXX be transactional, which it should have been
* anyhow.
*/
@Test
public void testMultithreadedSearchWithValidation() throws Exception {
ourRestServer.registerInterceptor(myRequestValidatingInterceptor);
Bundle input = new Bundle();
input.setType(BundleType.TRANSACTION);
for (int i = 0; i < 500; i++) {
Patient p = new Patient();
p.addIdentifier().setSystem("http://test").setValue("BAR");
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST).setUrl("Patient");
}
ourClient.transaction().withBundle(input).execute();
CloseableHttpResponse getMeta = ourHttpClient.execute(new HttpGet(ourServerBase + "/metadata"));
try {
assertEquals(200, getMeta.getStatusLine().getStatusCode());
} finally {
IOUtils.closeQuietly(getMeta);
}
List<BaseTask> tasks = Lists.newArrayList();
try {
for (int threadIndex = 0; threadIndex < 8; threadIndex++) {
SearchTask task = new SearchTask();
tasks.add(task);
task.start();
}
for (int threadIndex = 0; threadIndex < 8; threadIndex++) {
CreateTask task = new CreateTask();
tasks.add(task);
task.start();
}
} finally {
for (BaseTask next : tasks) {
next.join();
}
}
int total = 0;
for (BaseTask next : tasks) {
if (next.getError() != null) {
fail(next.getError().toString());
}
total += next.getTaskCount();
}
ourLog.info("Loaded {} searches", total);
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
public class BaseTask extends Thread {
protected Throwable myError;
protected int myTaskCount = 0;
public Throwable getError() {
return myError;
}
public int getTaskCount() {
return myTaskCount;
}
}
private final class SearchTask extends BaseTask {
@Override
public void run() {
CloseableHttpResponse get = null;
for (int i = 0; i < 20; i++) {
try {
get = ourHttpClient.execute(new HttpGet(ourServerBase + "/Patient?identifier=http%3A%2F%2Ftest%7CBAR," + UUID.randomUUID().toString()));
try {
assertEquals(200, get.getStatusLine().getStatusCode());
myTaskCount++;
} finally {
IOUtils.closeQuietly(get);
}
} catch (Throwable e) {
ourLog.error("Failure during search", e);
myError = e;
return;
}
}
}
}
private final class CreateTask extends BaseTask {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
try {
Patient p = new Patient();
p.addIdentifier().setSystem("http://test").setValue("BAR").setType(new CodeableConcept().addCoding(new Coding().setSystem("http://foo").setCode("bar")));
p.setGender(org.hl7.fhir.dstu3.model.Enumerations.AdministrativeGender.MALE);
ourClient.create().resource(p).execute();
ourSearchParamRegistry.forceRefresh();
} catch (Throwable e) {
ourLog.error("Failure during search", e);
myError = e;
return;
}
}
}
}
}

View File

@ -120,6 +120,11 @@
JpaConformanceProvider now has a configuration setting to enable and
disable adding resource counts to the server metadata.
</action>
<action type="fix">
Avoid a deadlock in JPA server when the RequestValidatingInterceptor is being
used and a large number of resources are being created by clients at
the same time.
</action>
</release>
<release version="2.5" date="2017-06-08">
<action type="fix">