diff --git a/appveyor.yml b/appveyor.yml index d40c830d242..c8a3ee8500a 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,4 +1,4 @@ -version: {build} +version: 1.0.{build} image: Visual Studio 2017 cache: - C:\maven\ diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index 49950d34dd3..6f2a727e59e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -19,39 +19,51 @@ package ca.uhn.fhir.jpa.search; * limitations under the License. * #L%family */ -import java.util.*; -import java.util.concurrent.*; - -import javax.persistence.EntityManager; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.DaoConfig; +import ca.uhn.fhir.jpa.dao.IDao; +import ca.uhn.fhir.jpa.dao.ISearchBuilder; +import ca.uhn.fhir.jpa.dao.SearchParameterMap; +import ca.uhn.fhir.jpa.dao.data.ISearchDao; +import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; +import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; +import ca.uhn.fhir.jpa.entity.*; +import ca.uhn.fhir.jpa.util.StopWatch; +import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.rest.api.CacheControlDirective; import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.server.SimpleBundleProvider; +import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; +import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; +import ca.uhn.fhir.rest.server.method.PageMethodBinding; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.api.IBaseResource; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.*; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.transaction.*; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.*; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; - -import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.jpa.dao.*; -import ca.uhn.fhir.jpa.dao.data.*; -import ca.uhn.fhir.jpa.entity.*; -import ca.uhn.fhir.jpa.util.StopWatch; -import ca.uhn.fhir.model.api.Include; -import ca.uhn.fhir.rest.api.server.IBundleProvider; -import ca.uhn.fhir.rest.server.SimpleBundleProvider; -import ca.uhn.fhir.rest.server.exceptions.*; -import ca.uhn.fhir.rest.server.method.PageMethodBinding; +import javax.persistence.EntityManager; +import java.util.*; +import java.util.concurrent.*; public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { public static final int DEFAULT_SYNC_SIZE = 250; @@ -420,6 +432,19 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } } + /** + * A search task is a Callable task that runs in + * a thread pool to handle an individual search. One instance + * is created for any requested search and runs from the + * beginning to the end of the search. + * + * Understand: + * This class executes in its own thread separate from the + * web server client thread that made the request. We do that + * so that we can return to the client as soon as possible, + * but keep the search going in the background (and have + * the next page of results ready to go when the client asks). + */ public class SearchTask implements Callable { private final IDao myCallingDao; @@ -434,6 +459,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { private int myCountSaved = 0; private String mySearchUuid; + /** + * Constructor + */ public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) { mySearch = theSearch; myCallingDao = theCallingDao; @@ -443,6 +471,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { mySearchUuid = theSearchUuid; } + /** + * This method is called by the server HTTP thread, and + * will block until at least one page of results have been + * fetched from the DB, and will never block after that. + */ public Integer awaitInitialSync() { ourLog.trace("Awaiting initial sync"); do { @@ -451,6 +484,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { break; } } catch (InterruptedException e) { + // Shouldn't happen throw new InternalErrorException(e); } } while (mySearch.getStatus() == SearchStatusEnum.LOADING); @@ -459,11 +493,16 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { return mySearch.getTotalCount(); } + /** + * This is the method which actually performs the search. + * It is called automatically by the thread pool. + */ @Override public Void call() throws Exception { StopWatch sw = new StopWatch(); try { + // Create an initial search in the DB and give it an ID saveSearch(); TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); @@ -480,7 +519,9 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { } catch (Throwable t) { /* - * Don't print a stack trace for client errors.. that's just noisy + * Don't print a stack trace for client errors (i.e. requests that + * aren't valid because the client screwed up).. that's just noise + * in the logs and who needs that. */ boolean logged = false; if (t instanceof BaseServerResponseException) { @@ -535,13 +576,27 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { Class resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); ISearchBuilder sb = myCallingDao.newSearchBuilder(); sb.setType(resourceTypeClass, myResourceType); - Iterator theResultIter = sb.createQuery(myParams, mySearchUuid); + Iterator theResultIterator = sb.createQuery(myParams, mySearchUuid); - while (theResultIter.hasNext()) { - myUnsyncedPids.add(theResultIter.next()); - if (myUnsyncedPids.size() >= mySyncSize) { - saveUnsynced(theResultIter); + while (theResultIterator.hasNext()) { + myUnsyncedPids.add(theResultIterator.next()); + + boolean shouldSync = myUnsyncedPids.size() >= mySyncSize; + + if (myDaoConfig.getCountSearchResultsUpTo() != null && + myDaoConfig.getCountSearchResultsUpTo() > 0 && + myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { + shouldSync = false; } + + if (myUnsyncedPids.size() > 50000) { + shouldSync = true; + } + + if (shouldSync) { + saveUnsynced(theResultIterator); + } + if (myLoadingThrottleForUnitTests != null) { try { Thread.sleep(myLoadingThrottleForUnitTests); @@ -549,9 +604,12 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { // ignore } } + + // Check if an abort got requested Validate.isTrue(myAbortRequested == false, "Abort has been requested"); + } - saveUnsynced(theResultIter); + saveUnsynced(theResultIterator); } public CountDownLatch getCompletionLatch() { @@ -648,22 +706,25 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { mySearch.setStatus(SearchStatusEnum.FINISHED); } } + mySearch.setNumFound(myCountSaved); + + int numSynced; + synchronized (mySyncedPids) { + numSynced = mySyncedPids.size(); + } + + if (myDaoConfig.getCountSearchResultsUpTo() == null || + myDaoConfig.getCountSearchResultsUpTo() <= 0 || + myDaoConfig.getCountSearchResultsUpTo() <= numSynced) { + myInitialCollectionLatch.countDown(); + } + doSaveSearch(); } }); - int numSynced; - synchronized (mySyncedPids) { - numSynced = mySyncedPids.size(); - } - - if (myDaoConfig.getCountSearchResultsUpTo() == null || - myDaoConfig.getCountSearchResultsUpTo() <= 0 || - myDaoConfig.getCountSearchResultsUpTo() <= numSynced) { - myInitialCollectionLatch.countDown(); - } } }