Merge branch 'jpa_perf_enhancements' of github.com:jamesagnew/hapi-fhir into jpa_perf_enhancements

This commit is contained in:
James 2017-04-16 15:58:38 -04:00
commit 1d9e0dc4cc
8 changed files with 192 additions and 22 deletions

View File

@ -130,6 +130,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Server -->
<dependency>

View File

@ -109,8 +109,12 @@ public class PageMethodBinding extends BaseResourceReturningMethodBinding {
offsetI = 0;
}
int start = Math.min(offsetI, resultList.size() - 1);
Integer totalNum = resultList.size();
int start = offsetI;
if (totalNum != null) {
start = Math.min(start, totalNum - 1);
}
ResponseEncoding responseEncoding = RestfulServerUtils.determineResponseEncodingNoDefault(theRequest, theServer.getDefaultResponseEncoding());
boolean prettyPrint = RestfulServerUtils.prettyPrintResponse(theServer, theRequest);

View File

@ -58,7 +58,8 @@ public interface IBundleProvider {
/**
* Returns the total number of results which match the given query (exclusive of any
* _include's or OperationOutcome)
* _include's or OperationOutcome). May return {@literal null} if the total size is not
* known or would be too expensive to calculate.
*/
Integer size();

View File

@ -3,6 +3,10 @@ package ca.uhn.fhir.jpa.search;
import java.util.List;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
@ -16,20 +20,29 @@ public class PersistedJpaSearchFirstPageBundleProvider extends PersistedJpaBundl
private SearchTask mySearchTask;
private ISearchBuilder mySearchBuilder;
private Search mySearch;
private PlatformTransactionManager myTxManager;
public PersistedJpaSearchFirstPageBundleProvider(Search theSearch, IDao theDao, SearchTask theSearchTask, ISearchBuilder theSearchBuilder) {
public PersistedJpaSearchFirstPageBundleProvider(Search theSearch, IDao theDao, SearchTask theSearchTask, ISearchBuilder theSearchBuilder, PlatformTransactionManager theTxManager) {
super(theSearch.getUuid(), theDao);
setSearchEntity(theSearch);
mySearchTask = theSearchTask;
mySearchBuilder = theSearchBuilder;
mySearch = theSearch;
myTxManager = theTxManager;
}
@Override
public List<IBaseResource> getResources(int theFromIndex, int theToIndex) {
checkForFailedSearch();
List<Long> pids = mySearchTask.getResourcePids(theFromIndex, theToIndex);
return toResourceList(mySearchBuilder, pids);
final List<Long> pids = mySearchTask.getResourcePids(theFromIndex, theToIndex);
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
return txTemplate.execute(new TransactionCallback<List<IBaseResource>>() {
@Override
public List<IBaseResource> doInTransaction(TransactionStatus theStatus) {
return toResourceList(mySearchBuilder, pids);
}});
}
@Override

View File

@ -4,11 +4,11 @@ import java.util.*;
import java.util.concurrent.*;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
import javax.transaction.Transactional.TxType;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.id.ResultSetIdentifierConsumer;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
@ -17,9 +17,9 @@ import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.servlet.ThemeResolver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -41,6 +41,8 @@ import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
static final int DEFAULT_SYNC_SIZE = 250;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
@Autowired
@ -70,16 +72,29 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@Override
public List<Long> getResources(String theUuid, int theFrom, int theTo) {
SearchTask task = myIdToSearchTask.get(theUuid);
if (task != null) {
return task.getResourcePids(theFrom, theTo);
@Transactional(value=TxType.NOT_SUPPORTED)
public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
if (myNeverUseLocalSearchForUnitTests == false) {
SearchTask task = myIdToSearchTask.get(theUuid);
if (task != null) {
return task.getResourcePids(theFrom, theTo);
}
}
Search search;
StopWatch sw = new StopWatch();
while (true) {
search = mySearchDao.findByUuid(theUuid);
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
search = txTemplate.execute(new TransactionCallback<Search>() {
@Override
public Search doInTransaction(TransactionStatus theStatus) {
return mySearchDao.findByUuid(theUuid);
}
});
if (search == null) {
ourLog.info("Client requested unknown paging ID[{}]", theUuid);
String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
@ -88,9 +103,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
verifySearchHasntFailedOrThrowInternalErrorException(search);
if (search.getStatus() == SearchStatusEnum.FINISHED) {
ourLog.info("Search entity marked as finished");
break;
}
if (search.getNumFound() >= theTo) {
ourLog.info("Search entity has {} results so far", search.getNumFound());
break;
}
@ -180,7 +197,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb);
PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myTxManager);
retVal.setContext(myContext);
retVal.setEntityManager(myEntityManager);
retVal.setPlatformTransactionManager(myTxManager);
@ -233,9 +250,24 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
}
private int mySyncSize = DEFAULT_SYNC_SIZE;
@VisibleForTesting
void setSyncSizeForUnitTests(int theSyncSize) {
mySyncSize = theSyncSize;
}
@VisibleForTesting
void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
}
private Integer myLoadingThrottleForUnitTests = null;
private boolean myNeverUseLocalSearchForUnitTests;
public class SearchTask implements Callable<Void> {
private static final int SYNC_SIZE = 250;
private int myCountSaved = 0;
private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
private final Search mySearch;
@ -269,7 +301,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
@Override
public Void call() throws Exception {
StopWatch sw = new StopWatch();
try {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
@ -279,9 +311,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
doSearch();
}
});
ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis());
} catch (Throwable t) {
ourLog.error("Failed during search loading after {}ms", t, sw.getMillis());
myUnsyncedPids.clear();
@ -290,7 +322,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
String failureMessage = ExceptionUtils.getRootCauseMessage(t);
mySearch.setFailureMessage(failureMessage);
saveSearch();
}
myIdToSearchTask.remove(mySearch.getUuid());
@ -305,9 +337,16 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
while (theResultIter.hasNext()) {
myUnsyncedPids.add(theResultIter.next());
if (myUnsyncedPids.size() >= SYNC_SIZE) {
if (myUnsyncedPids.size() >= mySyncSize) {
saveUnsynced(theResultIter);
}
if (myLoadingThrottleForUnitTests != null) {
try {
Thread.sleep(myLoadingThrottleForUnitTests);
} catch (InterruptedException e) {
// ignore
}
}
}
saveUnsynced(theResultIter);
}
@ -449,4 +488,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
return page;
}
void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
}
}

View File

@ -31,6 +31,7 @@ import ca.uhn.fhir.jpa.config.dstu3.WebsocketDstu3Config;
import ca.uhn.fhir.jpa.config.dstu3.WebsocketDstu3DispatcherConfig;
import ca.uhn.fhir.jpa.dao.dstu3.BaseJpaDstu3Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.testutil.RandomServerPortProvider;
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainDstu3;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
@ -53,6 +54,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
protected static String ourServerBase;
private static GenericWebApplicationContext ourWebApplicationContext;
private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider;
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
public BaseResourceProviderDstu3Test() {
super();
@ -138,6 +140,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test {
WebApplicationContext wac = WebApplicationContextUtils.getWebApplicationContext(subsServletHolder.getServlet().getServletConfig().getServletContext());
myValidationSupport = wac.getBean(JpaValidationSupportChainDstu3.class);
mySearchCoordinatorSvc = wac.getBean(ISearchCoordinatorSvc.class);
ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase);
ourClient.registerInterceptor(new LoggingInterceptor(true));

View File

@ -0,0 +1,101 @@
package ca.uhn.fhir.jpa.search;
import static org.apache.commons.lang3.StringUtils.leftPad;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;
import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.dstu3.model.Patient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
import org.springframework.cglib.proxy.Proxy;
import org.springframework.test.util.AopTestUtils;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.util.TestUtil;
public class PagingMultinodeProviderDstu3Test extends BaseResourceProviderDstu3Test {
private SearchCoordinatorSvcImpl mySearchCoordinatorSvcRaw;
@Override
@After
public void after() throws Exception {
super.after();
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
myDaoConfig.setAllowExternalReferences(new DaoConfig().isAllowExternalReferences());
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(null);
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(SearchCoordinatorSvcImpl.DEFAULT_SYNC_SIZE);
mySearchCoordinatorSvcRaw.setNeverUseLocalSearchForUnitTests(false);
}
@Override
public void before() throws Exception {
super.before();
myFhirCtx.setParserErrorHandler(new StrictErrorHandler());
myDaoConfig.setAllowMultipleDelete(true);
mySearchCoordinatorSvcRaw = AopTestUtils.getTargetObject(mySearchCoordinatorSvc);
// mySearchCoordinatorSvcRaw = (SearchCoordinatorSvcImpl)mySearchCoordinatorSvc;
}
@Test
public void testSearch() {
{
for (int i = 0; i < 100; i++) {
Patient patient = new Patient();
String id = "A" + leftPad(Integer.toString(i), 3, '0');
patient.setId(id);
patient.addIdentifier().setSystem("urn:system").setValue("A" + i);
patient.addName().setFamily(id);
myPatientDao.update(patient, mySrd).getId().toUnqualifiedVersionless();
}
}
Bundle found;
mySearchCoordinatorSvcRaw.setLoadingThrottleForUnitTests(50);
mySearchCoordinatorSvcRaw.setSyncSizeForUnitTests(10);
mySearchCoordinatorSvcRaw.setNeverUseLocalSearchForUnitTests(true);
found = ourClient
.search()
.forResource(Patient.class)
.sort().ascending(Patient.SP_FAMILY)
.count(10)
.returnBundle(Bundle.class)
.execute();
assertThat(toUnqualifiedVersionlessIdValues(found), contains("Patient/A000", "Patient/A001", "Patient/A002", "Patient/A003", "Patient/A004", "Patient/A005", "Patient/A006", "Patient/A007", "Patient/A008", "Patient/A009"));
found = ourClient
.loadPage()
.next(found)
.execute();
assertThat(toUnqualifiedVersionlessIdValues(found), contains("Patient/A010", "Patient/A011", "Patient/A012", "Patient/A013", "Patient/A014", "Patient/A015", "Patient/A016", "Patient/A017", "Patient/A018", "Patient/A019"));
found = ourClient
.loadPage()
.next(found)
.execute();
assertThat(toUnqualifiedVersionlessIdValues(found), contains("Patient/A020", "Patient/A021", "Patient/A022", "Patient/A023", "Patient/A024", "Patient/A025", "Patient/A026", "Patient/A027", "Patient/A028", "Patient/A029"));
found = ourClient
.loadPage()
.next(found)
.execute();
assertThat(toUnqualifiedVersionlessIdValues(found), contains("Patient/A030", "Patient/A031", "Patient/A032", "Patient/A033", "Patient/A034", "Patient/A035", "Patient/A036", "Patient/A037", "Patient/A038", "Patient/A039"));
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -34,11 +34,11 @@
</logger>
<!-- Set to 'trace' to enable SQL logging -->
<logger name="org.hibernate.SQL" additivity="false" level="trace">
<logger name="org.hibernate.SQL" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>
<!-- Set to 'trace' to enable SQL Value logging -->
<logger name="org.hibernate.type" additivity="false" level="trace">
<logger name="org.hibernate.type" additivity="false" level="info">
<appender-ref ref="STDOUT" />
</logger>