From 7f2cf17f9f1040e9c391f6c47fd556a0cffff7ce Mon Sep 17 00:00:00 2001 From: James Date: Mon, 17 Apr 2017 17:29:32 -0400 Subject: [PATCH] Fix perf issue --- .../jpa/search/SearchCoordinatorSvcImpl.java | 185 ++++++++++-------- 1 file changed, 103 insertions(+), 82 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index 05cc85290e6..82acaa04d2f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -1,7 +1,19 @@ package ca.uhn.fhir.jpa.search; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.persistence.EntityManager; import javax.transaction.Transactional; @@ -15,9 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; -import org.springframework.http.HttpStatus; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallbackWithoutResult; @@ -33,7 +45,11 @@ import ca.uhn.fhir.jpa.dao.SearchParameterMap; import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; -import ca.uhn.fhir.jpa.entity.*; +import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.entity.SearchInclude; +import ca.uhn.fhir.jpa.entity.SearchResult; +import ca.uhn.fhir.jpa.entity.SearchStatusEnum; +import ca.uhn.fhir.jpa.entity.SearchTypeEnum; import ca.uhn.fhir.jpa.util.StopWatch; import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.rest.method.PageMethodBinding; @@ -42,7 +58,6 @@ import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; -import ca.uhn.fhir.util.ObjectUtil; public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { static final int DEFAULT_SYNC_SIZE = 250; @@ -55,15 +70,21 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { private EntityManager myEntityManager; private ExecutorService myExecutor; private final ConcurrentHashMap myIdToSearchTask = new ConcurrentHashMap(); - private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; + private Integer myLoadingThrottleForUnitTests = null; + private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; + private boolean myNeverUseLocalSearchForUnitTests; private final List myResultSizeLatch = new ArrayList(); @Autowired private ISearchDao mySearchDao; @Autowired private ISearchIncludeDao mySearchIncludeDao; + @Autowired private ISearchResultDao mySearchResultDao; + + private int mySyncSize = DEFAULT_SYNC_SIZE; + @Autowired private PlatformTransactionManager myTxManager; @@ -76,7 +97,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } @Override - @Transactional(value=TxType.NOT_SUPPORTED) + @Transactional(value = TxType.NOT_SUPPORTED) public List getResources(final String theUuid, int theFrom, int theTo) { if (myNeverUseLocalSearchForUnitTests == false) { SearchTask task = myIdToSearchTask.get(theUuid); @@ -85,12 +106,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } } + TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); + txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + Search search; StopWatch sw = new StopWatch(); while (true) { - - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); + search = txTemplate.execute(new TransactionCallback() { @Override public Search doInTransaction(TransactionStatus theStatus) { @@ -98,7 +120,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } }); - if (search == null) { ourLog.info("Client requested unknown paging ID[{}]", theUuid); String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid); @@ -126,17 +147,23 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } } - Pageable page = toPage(theFrom, theTo); + final Pageable page = toPage(theFrom, theTo); if (page == null) { return Collections.emptyList(); } - Page searchResults = mySearchResultDao.findWithSearchUuid(search, page); - List retVal = new ArrayList(); - for (SearchResult next : searchResults) { - retVal.add(next.getResourcePid()); - } - + final Search foundSearch = search; + + List retVal = txTemplate.execute(new TransactionCallback>() { + @Override + public List doInTransaction(TransactionStatus theStatus) { + final List resultPids = new ArrayList(); + Page searchResults = mySearchResultDao.findWithSearchUuid(foundSearch, page); + for (SearchResult next : searchResults) { + resultPids.add(next.getResourcePid()); + } + return resultPids; } + }); return retVal; } @@ -152,7 +179,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { // Load the results synchronously List pids = new ArrayList(); - + Iterator resultIter = sb.createQuery(theParams); while (resultIter.hasNext()) { pids.add(resultIter.next()); @@ -224,11 +251,20 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { myEntityManager = theEntityManager; } + @VisibleForTesting + void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { + myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; + } + @VisibleForTesting void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; } + void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { + myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; + } + @VisibleForTesting void setSearchDaoForUnitTest(ISearchDao theSearchDao) { mySearchDao = theSearchDao; @@ -244,47 +280,56 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { mySearchResultDao = theSearchResultDao; } + @VisibleForTesting + void setSyncSizeForUnitTests(int theSyncSize) { + mySyncSize = theSyncSize; + } + @VisibleForTesting void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { myTxManager = theTxManager; } + static Pageable toPage(final int theFromIndex, int theToIndex) { + int pageSize = theToIndex - theFromIndex; + if (pageSize < 1) { + return null; + } + + int pageIndex = theFromIndex / pageSize; + + Pageable page = new PageRequest(pageIndex, pageSize) { + private static final long serialVersionUID = 1L; + + @Override + public int getOffset() { + return theFromIndex; + } + }; + + return page; + } + static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) { if (theSearch.getStatus() == SearchStatusEnum.FAILED) { Integer status = theSearch.getFailureCode(); status = ObjectUtils.defaultIfNull(status, 500); - + String message = theSearch.getFailureMessage(); throw BaseServerResponseException.newInstance(status, message); } } - private int mySyncSize = DEFAULT_SYNC_SIZE; - - @VisibleForTesting - void setSyncSizeForUnitTests(int theSyncSize) { - mySyncSize = theSyncSize; - } - - @VisibleForTesting - void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { - myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; - } - - private Integer myLoadingThrottleForUnitTests = null; - - private boolean myNeverUseLocalSearchForUnitTests; - public class SearchTask implements Callable { + private final IDao myCallingDao; private int myCountSaved = 0; private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); + private SearchParameterMap myParams; + private String myResourceType; private final Search mySearch; private final ArrayList mySyncedPids = new ArrayList(); private final ArrayList myUnsyncedPids = new ArrayList(); - private final IDao myCallingDao; - private SearchParameterMap myParams; - private String myResourceType; public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) { mySearch = theSearch; @@ -294,7 +339,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } public void awaitInitialSync() { - ourLog.info("Awaiting initial sync"); + ourLog.trace("Awaiting initial sync"); do { try { if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) { @@ -304,7 +349,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { throw new InternalErrorException(e); } } while (mySearch.getStatus() == SearchStatusEnum.LOADING); - ourLog.info("Initial sync completed"); + ourLog.trace("Initial sync completed"); } @Override @@ -329,18 +374,18 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { Throwable rootCause = ExceptionUtils.getRootCause(t); rootCause = ObjectUtils.defaultIfNull(rootCause, t); - + String failureMessage = rootCause.getMessage(); - + int failureCode = InternalErrorException.STATUS_CODE; if (t instanceof BaseServerResponseException) { failureCode = ((BaseServerResponseException) t).getStatusCode(); } - + mySearch.setFailureMessage(failureMessage); mySearch.setFailureCode(failureCode); mySearch.setStatus(SearchStatusEnum.FAILED); - + saveSearch(); } @@ -349,6 +394,17 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { return null; } + private void doSaveSearch() { + if (mySearch.getId() == null) { + mySearchDao.save(mySearch); + for (SearchInclude next : mySearch.getIncludes()) { + mySearchIncludeDao.save(next); + } + } else { + mySearchDao.save(mySearch); + } + } + private void doSearch() { Class resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); ISearchBuilder sb = myCallingDao.newSearchBuilder(); @@ -371,17 +427,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { saveUnsynced(theResultIter); } - private void doSaveSearch() { - if (mySearch.getId() == null) { - mySearchDao.save(mySearch); - for (SearchInclude next : mySearch.getIncludes()) { - mySearchIncludeDao.save(next); - } - } else { - mySearchDao.save(mySearch); - } - } - public List getResourcePids(int theFromIndex, int theToIndex) { ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); @@ -389,7 +434,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { synchronized (mySyncedPids) { if (mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING) { int latchSize = theToIndex - mySyncedPids.size(); - ourLog.info("Registering latch to await {} results (want {} total)", latchSize, theToIndex); + ourLog.trace("Registering latch to await {} results (want {} total)", latchSize, theToIndex); latch = new CountDownLatch(latchSize); myResultSizeLatch.add(latch); } @@ -455,7 +500,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { synchronized (mySyncedPids) { int numSyncedThisPass = myUnsyncedPids.size(); - ourLog.info("Syncing {} search results", numSyncedThisPass); + ourLog.trace("Syncing {} search results", numSyncedThisPass); mySyncedPids.addAll(myUnsyncedPids); myUnsyncedPids.clear(); @@ -488,28 +533,4 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } - static Pageable toPage(final int theFromIndex, int theToIndex) { - int pageSize = theToIndex - theFromIndex; - if (pageSize < 1) { - return null; - } - - int pageIndex = theFromIndex / pageSize; - - Pageable page = new PageRequest(pageIndex, pageSize) { - private static final long serialVersionUID = 1L; - - @Override - public int getOffset() { - return theFromIndex; - } - }; - - return page; - } - - void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { - myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; - } - }