Streamline search coordinator to improve performance

This commit is contained in:
James Agnew 2017-11-15 18:31:03 +01:00
parent d5425fba5e
commit 0d0300623f
2 changed files with 99 additions and 38 deletions

View File

@ -1,4 +1,4 @@
version: {build} version: 1.0.{build}
image: Visual Studio 2017 image: Visual Studio 2017
cache: cache:
- C:\maven\ - C:\maven\

View File

@ -19,39 +19,51 @@ package ca.uhn.fhir.jpa.search;
* limitations under the License. * limitations under the License.
* #L%family * #L%family
*/ */
import java.util.*;
import java.util.concurrent.*;
import javax.persistence.EntityManager;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.IDao;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.method.PageMethodBinding;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.*; import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.transaction.*; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.*; import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import com.google.common.annotations.VisibleForTesting; import javax.persistence.EntityManager;
import com.google.common.collect.Lists; import java.util.*;
import java.util.concurrent.*;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.*;
import ca.uhn.fhir.rest.server.method.PageMethodBinding;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
public static final int DEFAULT_SYNC_SIZE = 250; public static final int DEFAULT_SYNC_SIZE = 250;
@ -420,6 +432,19 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} }
} }
/**
* A search task is a Callable task that runs in
* a thread pool to handle an individual search. One instance
* is created for any requested search and runs from the
* beginning to the end of the search.
*
* Understand:
* This class executes in its own thread separate from the
* web server client thread that made the request. We do that
* so that we can return to the client as soon as possible,
* but keep the search going in the background (and have
* the next page of results ready to go when the client asks).
*/
public class SearchTask implements Callable<Void> { public class SearchTask implements Callable<Void> {
private final IDao myCallingDao; private final IDao myCallingDao;
@ -434,6 +459,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private int myCountSaved = 0; private int myCountSaved = 0;
private String mySearchUuid; private String mySearchUuid;
/**
* Constructor
*/
public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) { public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) {
mySearch = theSearch; mySearch = theSearch;
myCallingDao = theCallingDao; myCallingDao = theCallingDao;
@ -443,6 +471,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
mySearchUuid = theSearchUuid; mySearchUuid = theSearchUuid;
} }
/**
* This method is called by the server HTTP thread, and
* will block until at least one page of results have been
* fetched from the DB, and will never block after that.
*/
public Integer awaitInitialSync() { public Integer awaitInitialSync() {
ourLog.trace("Awaiting initial sync"); ourLog.trace("Awaiting initial sync");
do { do {
@ -451,6 +484,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
break; break;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Shouldn't happen
throw new InternalErrorException(e); throw new InternalErrorException(e);
} }
} while (mySearch.getStatus() == SearchStatusEnum.LOADING); } while (mySearch.getStatus() == SearchStatusEnum.LOADING);
@ -459,11 +493,16 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
return mySearch.getTotalCount(); return mySearch.getTotalCount();
} }
/**
* This is the method which actually performs the search.
* It is called automatically by the thread pool.
*/
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
try { try {
// Create an initial search in the DB and give it an ID
saveSearch(); saveSearch();
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
@ -480,7 +519,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
} catch (Throwable t) { } catch (Throwable t) {
/* /*
* Don't print a stack trace for client errors.. that's just noisy * Don't print a stack trace for client errors (i.e. requests that
* aren't valid because the client screwed up).. that's just noise
* in the logs and who needs that.
*/ */
boolean logged = false; boolean logged = false;
if (t instanceof BaseServerResponseException) { if (t instanceof BaseServerResponseException) {
@ -535,13 +576,27 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = myCallingDao.newSearchBuilder(); ISearchBuilder sb = myCallingDao.newSearchBuilder();
sb.setType(resourceTypeClass, myResourceType); sb.setType(resourceTypeClass, myResourceType);
Iterator<Long> theResultIter = sb.createQuery(myParams, mySearchUuid); Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearchUuid);
while (theResultIter.hasNext()) { while (theResultIterator.hasNext()) {
myUnsyncedPids.add(theResultIter.next()); myUnsyncedPids.add(theResultIterator.next());
if (myUnsyncedPids.size() >= mySyncSize) {
saveUnsynced(theResultIter); boolean shouldSync = myUnsyncedPids.size() >= mySyncSize;
if (myDaoConfig.getCountSearchResultsUpTo() != null &&
myDaoConfig.getCountSearchResultsUpTo() > 0 &&
myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
shouldSync = false;
} }
if (myUnsyncedPids.size() > 50000) {
shouldSync = true;
}
if (shouldSync) {
saveUnsynced(theResultIterator);
}
if (myLoadingThrottleForUnitTests != null) { if (myLoadingThrottleForUnitTests != null) {
try { try {
Thread.sleep(myLoadingThrottleForUnitTests); Thread.sleep(myLoadingThrottleForUnitTests);
@ -549,9 +604,12 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// ignore // ignore
} }
} }
// Check if an abort got requested
Validate.isTrue(myAbortRequested == false, "Abort has been requested"); Validate.isTrue(myAbortRequested == false, "Abort has been requested");
} }
saveUnsynced(theResultIter); saveUnsynced(theResultIterator);
} }
public CountDownLatch getCompletionLatch() { public CountDownLatch getCompletionLatch() {
@ -648,22 +706,25 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
mySearch.setStatus(SearchStatusEnum.FINISHED); mySearch.setStatus(SearchStatusEnum.FINISHED);
} }
} }
mySearch.setNumFound(myCountSaved); mySearch.setNumFound(myCountSaved);
int numSynced;
synchronized (mySyncedPids) {
numSynced = mySyncedPids.size();
}
if (myDaoConfig.getCountSearchResultsUpTo() == null ||
myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
myInitialCollectionLatch.countDown();
}
doSaveSearch(); doSaveSearch();
} }
}); });
int numSynced;
synchronized (mySyncedPids) {
numSynced = mySyncedPids.size();
}
if (myDaoConfig.getCountSearchResultsUpTo() == null ||
myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
myInitialCollectionLatch.countDown();
}
} }
} }