Fix perf issue

This commit is contained in:
James 2017-04-17 17:29:32 -04:00
parent 5adc09ad56
commit 7f2cf17f9f
1 changed files with 103 additions and 82 deletions

View File

@ -1,7 +1,19 @@
package ca.uhn.fhir.jpa.search; package ca.uhn.fhir.jpa.search;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.transaction.Transactional; import javax.transaction.Transactional;
@ -15,9 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
@ -33,7 +45,11 @@ import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.*; import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchInclude;
import ca.uhn.fhir.jpa.entity.SearchResult;
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.util.StopWatch; import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.method.PageMethodBinding; import ca.uhn.fhir.rest.method.PageMethodBinding;
@ -42,7 +58,6 @@ import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.util.ObjectUtil;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
static final int DEFAULT_SYNC_SIZE = 250; static final int DEFAULT_SYNC_SIZE = 250;
@ -55,15 +70,21 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private EntityManager myEntityManager; private EntityManager myEntityManager;
private ExecutorService myExecutor; private ExecutorService myExecutor;
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>(); private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>();
private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; private Integer myLoadingThrottleForUnitTests = null;
private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
private boolean myNeverUseLocalSearchForUnitTests;
private final List<CountDownLatch> myResultSizeLatch = new ArrayList<CountDownLatch>(); private final List<CountDownLatch> myResultSizeLatch = new ArrayList<CountDownLatch>();
@Autowired @Autowired
private ISearchDao mySearchDao; private ISearchDao mySearchDao;
@Autowired @Autowired
private ISearchIncludeDao mySearchIncludeDao; private ISearchIncludeDao mySearchIncludeDao;
@Autowired @Autowired
private ISearchResultDao mySearchResultDao; private ISearchResultDao mySearchResultDao;
private int mySyncSize = DEFAULT_SYNC_SIZE;
@Autowired @Autowired
private PlatformTransactionManager myTxManager; private PlatformTransactionManager myTxManager;
@ -76,7 +97,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
@Override @Override
@Transactional(value=TxType.NOT_SUPPORTED) @Transactional(value = TxType.NOT_SUPPORTED)
public List<Long> getResources(final String theUuid, int theFrom, int theTo) { public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
if (myNeverUseLocalSearchForUnitTests == false) { if (myNeverUseLocalSearchForUnitTests == false) {
SearchTask task = myIdToSearchTask.get(theUuid); SearchTask task = myIdToSearchTask.get(theUuid);
@ -85,12 +106,13 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
} }
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Search search; Search search;
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
while (true) { while (true) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
search = txTemplate.execute(new TransactionCallback<Search>() { search = txTemplate.execute(new TransactionCallback<Search>() {
@Override @Override
public Search doInTransaction(TransactionStatus theStatus) { public Search doInTransaction(TransactionStatus theStatus) {
@ -98,7 +120,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
}); });
if (search == null) { if (search == null) {
ourLog.info("Client requested unknown paging ID[{}]", theUuid); ourLog.info("Client requested unknown paging ID[{}]", theUuid);
String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid); String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
@ -126,17 +147,23 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
} }
Pageable page = toPage(theFrom, theTo); final Pageable page = toPage(theFrom, theTo);
if (page == null) { if (page == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
Page<SearchResult> searchResults = mySearchResultDao.findWithSearchUuid(search, page); final Search foundSearch = search;
List<Long> retVal = new ArrayList<Long>();
for (SearchResult next : searchResults) { List<Long> retVal = txTemplate.execute(new TransactionCallback<List<Long>>() {
retVal.add(next.getResourcePid()); @Override
} public List<Long> doInTransaction(TransactionStatus theStatus) {
final List<Long> resultPids = new ArrayList<Long>();
Page<SearchResult> searchResults = mySearchResultDao.findWithSearchUuid(foundSearch, page);
for (SearchResult next : searchResults) {
resultPids.add(next.getResourcePid());
}
return resultPids; }
});
return retVal; return retVal;
} }
@ -152,7 +179,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// Load the results synchronously // Load the results synchronously
List<Long> pids = new ArrayList<Long>(); List<Long> pids = new ArrayList<Long>();
Iterator<Long> resultIter = sb.createQuery(theParams); Iterator<Long> resultIter = sb.createQuery(theParams);
while (resultIter.hasNext()) { while (resultIter.hasNext()) {
pids.add(resultIter.next()); pids.add(resultIter.next());
@ -224,11 +251,20 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
myEntityManager = theEntityManager; myEntityManager = theEntityManager;
} }
@VisibleForTesting
void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
}
@VisibleForTesting @VisibleForTesting
void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
} }
void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
}
@VisibleForTesting @VisibleForTesting
void setSearchDaoForUnitTest(ISearchDao theSearchDao) { void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
mySearchDao = theSearchDao; mySearchDao = theSearchDao;
@ -244,47 +280,56 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
mySearchResultDao = theSearchResultDao; mySearchResultDao = theSearchResultDao;
} }
@VisibleForTesting
void setSyncSizeForUnitTests(int theSyncSize) {
mySyncSize = theSyncSize;
}
@VisibleForTesting @VisibleForTesting
void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
myTxManager = theTxManager; myTxManager = theTxManager;
} }
static Pageable toPage(final int theFromIndex, int theToIndex) {
int pageSize = theToIndex - theFromIndex;
if (pageSize < 1) {
return null;
}
int pageIndex = theFromIndex / pageSize;
Pageable page = new PageRequest(pageIndex, pageSize) {
private static final long serialVersionUID = 1L;
@Override
public int getOffset() {
return theFromIndex;
}
};
return page;
}
static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) { static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
if (theSearch.getStatus() == SearchStatusEnum.FAILED) { if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
Integer status = theSearch.getFailureCode(); Integer status = theSearch.getFailureCode();
status = ObjectUtils.defaultIfNull(status, 500); status = ObjectUtils.defaultIfNull(status, 500);
String message = theSearch.getFailureMessage(); String message = theSearch.getFailureMessage();
throw BaseServerResponseException.newInstance(status, message); throw BaseServerResponseException.newInstance(status, message);
} }
} }
private int mySyncSize = DEFAULT_SYNC_SIZE;
@VisibleForTesting
void setSyncSizeForUnitTests(int theSyncSize) {
mySyncSize = theSyncSize;
}
@VisibleForTesting
void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
}
private Integer myLoadingThrottleForUnitTests = null;
private boolean myNeverUseLocalSearchForUnitTests;
public class SearchTask implements Callable<Void> { public class SearchTask implements Callable<Void> {
private final IDao myCallingDao;
private int myCountSaved = 0; private int myCountSaved = 0;
private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
private SearchParameterMap myParams;
private String myResourceType;
private final Search mySearch; private final Search mySearch;
private final ArrayList<Long> mySyncedPids = new ArrayList<Long>(); private final ArrayList<Long> mySyncedPids = new ArrayList<Long>();
private final ArrayList<Long> myUnsyncedPids = new ArrayList<Long>(); private final ArrayList<Long> myUnsyncedPids = new ArrayList<Long>();
private final IDao myCallingDao;
private SearchParameterMap myParams;
private String myResourceType;
public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) { public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
mySearch = theSearch; mySearch = theSearch;
@ -294,7 +339,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
public void awaitInitialSync() { public void awaitInitialSync() {
ourLog.info("Awaiting initial sync"); ourLog.trace("Awaiting initial sync");
do { do {
try { try {
if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) { if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
@ -304,7 +349,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
throw new InternalErrorException(e); throw new InternalErrorException(e);
} }
} while (mySearch.getStatus() == SearchStatusEnum.LOADING); } while (mySearch.getStatus() == SearchStatusEnum.LOADING);
ourLog.info("Initial sync completed"); ourLog.trace("Initial sync completed");
} }
@Override @Override
@ -329,18 +374,18 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
Throwable rootCause = ExceptionUtils.getRootCause(t); Throwable rootCause = ExceptionUtils.getRootCause(t);
rootCause = ObjectUtils.defaultIfNull(rootCause, t); rootCause = ObjectUtils.defaultIfNull(rootCause, t);
String failureMessage = rootCause.getMessage(); String failureMessage = rootCause.getMessage();
int failureCode = InternalErrorException.STATUS_CODE; int failureCode = InternalErrorException.STATUS_CODE;
if (t instanceof BaseServerResponseException) { if (t instanceof BaseServerResponseException) {
failureCode = ((BaseServerResponseException) t).getStatusCode(); failureCode = ((BaseServerResponseException) t).getStatusCode();
} }
mySearch.setFailureMessage(failureMessage); mySearch.setFailureMessage(failureMessage);
mySearch.setFailureCode(failureCode); mySearch.setFailureCode(failureCode);
mySearch.setStatus(SearchStatusEnum.FAILED); mySearch.setStatus(SearchStatusEnum.FAILED);
saveSearch(); saveSearch();
} }
@ -349,6 +394,17 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
return null; return null;
} }
private void doSaveSearch() {
if (mySearch.getId() == null) {
mySearchDao.save(mySearch);
for (SearchInclude next : mySearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
mySearchDao.save(mySearch);
}
}
private void doSearch() { private void doSearch() {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = myCallingDao.newSearchBuilder(); ISearchBuilder sb = myCallingDao.newSearchBuilder();
@ -371,17 +427,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
saveUnsynced(theResultIter); saveUnsynced(theResultIter);
} }
private void doSaveSearch() {
if (mySearch.getId() == null) {
mySearchDao.save(mySearch);
for (SearchInclude next : mySearch.getIncludes()) {
mySearchIncludeDao.save(next);
}
} else {
mySearchDao.save(mySearch);
}
}
public List<Long> getResourcePids(int theFromIndex, int theToIndex) { public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
@ -389,7 +434,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
synchronized (mySyncedPids) { synchronized (mySyncedPids) {
if (mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING) { if (mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING) {
int latchSize = theToIndex - mySyncedPids.size(); int latchSize = theToIndex - mySyncedPids.size();
ourLog.info("Registering latch to await {} results (want {} total)", latchSize, theToIndex); ourLog.trace("Registering latch to await {} results (want {} total)", latchSize, theToIndex);
latch = new CountDownLatch(latchSize); latch = new CountDownLatch(latchSize);
myResultSizeLatch.add(latch); myResultSizeLatch.add(latch);
} }
@ -455,7 +500,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
synchronized (mySyncedPids) { synchronized (mySyncedPids) {
int numSyncedThisPass = myUnsyncedPids.size(); int numSyncedThisPass = myUnsyncedPids.size();
ourLog.info("Syncing {} search results", numSyncedThisPass); ourLog.trace("Syncing {} search results", numSyncedThisPass);
mySyncedPids.addAll(myUnsyncedPids); mySyncedPids.addAll(myUnsyncedPids);
myUnsyncedPids.clear(); myUnsyncedPids.clear();
@ -488,28 +533,4 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
static Pageable toPage(final int theFromIndex, int theToIndex) {
int pageSize = theToIndex - theFromIndex;
if (pageSize < 1) {
return null;
}
int pageIndex = theFromIndex / pageSize;
Pageable page = new PageRequest(pageIndex, pageSize) {
private static final long serialVersionUID = 1L;
@Override
public int getOffset() {
return theFromIndex;
}
};
return page;
}
void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
}
} }