Partition data leaks across searches (#2220)

* Partition data leaks across searches

* Add changelog

* Resolve FIXME

* Add test logging

* Test fix

* Test fix
This commit is contained in:
James Agnew 2020-12-06 16:32:07 -05:00 committed by GitHub
parent 6799973016
commit d958764d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 675 additions and 372 deletions

View File

@ -284,7 +284,7 @@ public class RequestPartitionId {
* Returns the partition IDs (numeric) as a joined string with a space between, using the string "null" for any null values
*/
public static String stringifyForKey(@Nonnull RequestPartitionId theRequestPartitionId) {
String retVal = "(all partitions)";
String retVal = "(all)";
if (!theRequestPartitionId.isAllPartitions()) {
assert theRequestPartitionId.hasPartitionIds();
retVal = theRequestPartitionId

View File

@ -0,0 +1,7 @@
---
type: security
issue: 2220
title: "An important security issue with the JPA Server was solved. This issue applies only to JPA servers running
in partitioned mode. When performing searches on a partitioned server, search results from previously cached
searches against different partitions may be returned, potentially leaking data across partitions. This issue
has been resolved."

View File

@ -2115,6 +2115,9 @@ public class DaoConfig {
DISABLED
}
/**
* This enum provides allowable options for {@link #setResourceServerIdStrategy(IdStrategyEnum)}
*/
public enum IdStrategyEnum {
/**
* This strategy is the default strategy, and it simply uses a sequential
@ -2127,6 +2130,9 @@ public class DaoConfig {
UUID
}
/**
* This enum provides allowable options for {@link #setResourceClientIdStrategy(ClientIdStrategyEnum)}
*/
public enum ClientIdStrategyEnum {
/**
* Clients are not allowed to supply IDs for resources that do not
@ -2152,7 +2158,7 @@ public class DaoConfig {
* to {@link IdStrategyEnum#UUID} in order to avoid any potential for conflicts. Otherwise
* a database sequence will be used to generate IDs and these IDs can conflict with
* client-assigned numeric IDs.
* </P>
* </p>
*/
ANY
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.api.svc;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -37,7 +38,7 @@ public interface ISearchCoordinatorSvc {
List<ResourcePersistentId> getResources(String theUuid, int theFrom, int theTo, @Nullable RequestDetails theRequestDetails);
IBundleProvider registerSearch(IFhirResourceDao<?> theCallingDao, SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, @Nullable RequestDetails theRequestDetails);
IBundleProvider registerSearch(IFhirResourceDao<?> theCallingDao, SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId);
/**
* Fetch the total number of search results for the given currently executing search, if one is currently executing and

View File

@ -51,6 +51,7 @@ import ca.uhn.fhir.jpa.patch.JsonPatchUtils;
import ca.uhn.fhir.jpa.patch.XmlPatchUtils;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -72,6 +73,8 @@ import ca.uhn.fhir.rest.api.server.SimplePreResourceAccessDetails;
import ca.uhn.fhir.rest.api.server.SimplePreResourceShowDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.IRestfulServerDefaults;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
@ -162,6 +165,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private IInstanceValidatorModule myInstanceValidator;
private String myResourceName;
private Class<T> myResourceType;
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
@Override
@Transactional
@ -777,7 +782,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
if (theRequestDetails == null || theRequestDetails.getServer() == null) {
return false;
}
return theRequestDetails.getServer().getPagingProvider() instanceof DatabaseBackedPagingProvider;
IRestfulServerDefaults server = theRequestDetails.getServer();
IPagingProvider pagingProvider = server.getPagingProvider();
return pagingProvider instanceof DatabaseBackedPagingProvider;
}
protected void markResourcesMatchingExpressionAsNeedingReindexing(Boolean theCurrentlyReindexing, String theExpression) {
@ -1294,11 +1301,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
cacheControlDirective.parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL));
}
IBundleProvider retVal = mySearchCoordinatorSvc.registerSearch(this, theParams, getResourceName(), cacheControlDirective, theRequest);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequest, getResourceName());
IBundleProvider retVal = mySearchCoordinatorSvc.registerSearch(this, theParams, getResourceName(), cacheControlDirective, theRequest, requestPartitionId);
if (retVal instanceof PersistedJpaBundleProvider) {
PersistedJpaBundleProvider provider = (PersistedJpaBundleProvider) retVal;
if (provider.isCacheHit()) {
if (provider.getCacheStatus() == SearchCacheStatusEnum.HIT) {
if (theServletResponse != null && theRequest != null) {
String value = "HIT from " + theRequest.getFhirServerBase();
theServletResponse.addHeader(Constants.HEADER_X_CACHE, value);

View File

@ -20,7 +20,9 @@ package ca.uhn.fhir.jpa.dao;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap.EverythingModeEnum;
import ca.uhn.fhir.model.api.IResource;
@ -37,12 +39,16 @@ import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
public class FhirResourceDaoPatientDstu2 extends BaseHapiFhirResourceDao<Patient>implements IFhirResourceDaoPatient<Patient> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
public FhirResourceDaoPatientDstu2() {
super();
}
@ -73,7 +79,8 @@ public class FhirResourceDaoPatientDstu2 extends BaseHapiFhirResourceDao<Patient
paramMap.setLoadSynchronous(true);
}
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequest, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest, requestPartitionId);
}
@Override

View File

@ -20,9 +20,11 @@ package ca.uhn.fhir.jpa.dao.dstu3;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDaoObservation;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
@ -31,18 +33,23 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.dstu3.model.Observation;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletResponse;
import java.util.Date;
public class FhirResourceDaoObservationDstu3 extends BaseHapiFhirResourceDaoObservation<Observation> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
@Override
public IBundleProvider observationsLastN(SearchParameterMap theSearchParameterMap, RequestDetails theRequestDetails, HttpServletResponse theServletResponse) {
updateSearchParamsForLastn(theSearchParameterMap, theRequestDetails);
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequestDetails, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails, requestPartitionId);
}
@Override

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.dao.dstu3;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap.EverythingModeEnum;
import ca.uhn.fhir.model.api.IResource;
@ -36,12 +38,16 @@ import ca.uhn.fhir.rest.param.StringParam;
import org.hl7.fhir.dstu3.model.Patient;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
public class FhirResourceDaoPatientDstu3 extends BaseHapiFhirResourceDao<Patient>implements IFhirResourceDaoPatient<Patient> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
private IBundleProvider doEverythingOperation(IIdType theId, IPrimitiveType<Integer> theCount, IPrimitiveType<Integer> theOffset, DateRangeParam theLastUpdated, SortSpec theSort, StringAndListParam theContent, StringAndListParam theNarrative, StringAndListParam theFilter, RequestDetails theRequest) {
SearchParameterMap paramMap = new SearchParameterMap();
if (theCount != null) {
@ -68,7 +74,8 @@ public class FhirResourceDaoPatientDstu3 extends BaseHapiFhirResourceDao<Patient
paramMap.setLoadSynchronous(true);
}
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequest, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest, requestPartitionId);
}
@Override

View File

@ -20,9 +20,11 @@ package ca.uhn.fhir.jpa.dao.r4;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDaoObservation;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
@ -31,6 +33,7 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Observation;
import org.springframework.beans.factory.annotation.Autowired;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@ -43,13 +46,15 @@ public class FhirResourceDaoObservationR4 extends BaseHapiFhirResourceDaoObserva
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
protected EntityManager myEntityManager;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperService;
@Override
public IBundleProvider observationsLastN(SearchParameterMap theSearchParameterMap, RequestDetails theRequestDetails, HttpServletResponse theServletResponse) {
updateSearchParamsForLastn(theSearchParameterMap, theRequestDetails);
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails);
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails, requestPartitionId);
}
@Override

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.dao.r4;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap.EverythingModeEnum;
import ca.uhn.fhir.model.api.IResource;
@ -36,12 +38,16 @@ import ca.uhn.fhir.rest.param.StringParam;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Patient;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
public class FhirResourceDaoPatientR4 extends BaseHapiFhirResourceDao<Patient>implements IFhirResourceDaoPatient<Patient> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
private IBundleProvider doEverythingOperation(IIdType theId, IPrimitiveType<Integer> theCount, IPrimitiveType<Integer> theOffset, DateRangeParam theLastUpdated, SortSpec theSort, StringAndListParam theContent, StringAndListParam theNarrative, StringAndListParam theFilter, RequestDetails theRequest) {
SearchParameterMap paramMap = new SearchParameterMap();
if (theCount != null) {
@ -68,7 +74,8 @@ public class FhirResourceDaoPatientR4 extends BaseHapiFhirResourceDao<Patient>im
paramMap.setLoadSynchronous(true);
}
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequest, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest, requestPartitionId);
}
@Override

View File

@ -20,9 +20,11 @@ package ca.uhn.fhir.jpa.dao.r5;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDaoObservation;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
@ -31,18 +33,23 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r5.model.Observation;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletResponse;
import java.util.Date;
public class FhirResourceDaoObservationR5 extends BaseHapiFhirResourceDaoObservation<Observation> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
@Override
public IBundleProvider observationsLastN(SearchParameterMap theSearchParameterMap, RequestDetails theRequestDetails, HttpServletResponse theServletResponse) {
updateSearchParamsForLastn(theSearchParameterMap, theRequestDetails);
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequestDetails, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, theSearchParameterMap, getResourceName(), new CacheControlDirective().parse(theRequestDetails.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequestDetails, requestPartitionId);
}
@Override

View File

@ -20,8 +20,10 @@ package ca.uhn.fhir.jpa.dao.r5;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoPatient;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap.EverythingModeEnum;
import ca.uhn.fhir.rest.api.CacheControlDirective;
@ -36,12 +38,16 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r5.model.Patient;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
public class FhirResourceDaoPatientR5 extends BaseHapiFhirResourceDao<Patient> implements IFhirResourceDaoPatient<Patient> {
@Autowired
private IRequestPartitionHelperSvc myPartitionHelperSvc;
private IBundleProvider doEverythingOperation(IIdType theId, IPrimitiveType<Integer> theCount, IPrimitiveType<Integer> theOffset, DateRangeParam theLastUpdated, SortSpec theSort, StringAndListParam theContent, StringAndListParam theNarrative, RequestDetails theRequest) {
SearchParameterMap paramMap = new SearchParameterMap();
if (theCount != null) {
@ -68,7 +74,8 @@ public class FhirResourceDaoPatientR5 extends BaseHapiFhirResourceDao<Patient> i
paramMap.setLoadSynchronous(true);
}
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest);
RequestPartitionId requestPartitionId = myPartitionHelperSvc.determineReadPartitionForRequest(theRequest, getResourceName());
return mySearchCoordinatorSvc.registerSearch(this, paramMap, getResourceName(), new CacheControlDirective().parse(theRequest.getHeaders(Constants.HEADER_CACHE_CONTROL)), theRequest, requestPartitionId);
}
@Override

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.entity;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.search.SearchCoordinatorSvcImpl;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -12,9 +13,34 @@ import org.hibernate.annotations.OptimisticLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.persistence.*;
import javax.annotation.Nonnull;
import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.Lob;
import javax.persistence.OneToMany;
import javax.persistence.SequenceGenerator;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.persistence.UniqueConstraint;
import javax.persistence.Version;
import java.io.Serializable;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import static org.apache.commons.lang3.StringUtils.left;
@ -39,7 +65,7 @@ import static org.apache.commons.lang3.StringUtils.left;
*/
@Entity
@Table(name = "HFJ_SEARCH", uniqueConstraints = {
@Table(name = Search.HFJ_SEARCH, uniqueConstraints = {
@UniqueConstraint(name = "IDX_SEARCH_UUID", columnNames = "SEARCH_UUID")
}, indexes = {
@Index(name = "IDX_SEARCH_RESTYPE_HASHS", columnList = "RESOURCE_TYPE,SEARCH_QUERY_STRING_HASH,CREATED"),
@ -49,6 +75,7 @@ public class Search implements ICachedSearchDetails, Serializable {
@SuppressWarnings("WeakerAccess")
public static final int UUID_COLUMN_LENGTH = 36;
public static final String HFJ_SEARCH = "HFJ_SEARCH";
private static final int MAX_SEARCH_QUERY_STRING = 10000;
private static final int FAILURE_MESSAGE_LENGTH = 500;
private static final long serialVersionUID = 1L;
@ -89,6 +116,9 @@ public class Search implements ICachedSearchDetails, Serializable {
private Long myResourceId;
@Column(name = "RESOURCE_TYPE", length = 200, nullable = true)
private String myResourceType;
/**
* Note that this field may have the request partition IDs prepended to it
*/
@Lob()
@Basic(fetch = FetchType.LAZY)
@Column(name = "SEARCH_QUERY_STRING", nullable = true, updatable = false, length = MAX_SEARCH_QUERY_STRING)
@ -112,6 +142,7 @@ public class Search implements ICachedSearchDetails, Serializable {
@Lob
@Column(name = "SEARCH_PARAM_MAP", nullable = true)
private byte[] mySearchParameterMap;
/**
* Constructor
*/
@ -255,19 +286,27 @@ public class Search implements ICachedSearchDetails, Serializable {
myResourceType = theResourceType;
}
/**
* Note that this field may have the request partition IDs prepended to it
*/
public String getSearchQueryString() {
return mySearchQueryString;
}
public void setSearchQueryString(String theSearchQueryString) {
if (theSearchQueryString == null || theSearchQueryString.length() > MAX_SEARCH_QUERY_STRING) {
public void setSearchQueryString(String theSearchQueryString, RequestPartitionId theRequestPartitionId) {
String searchQueryString = null;
if (theSearchQueryString != null) {
searchQueryString = createSearchQueryStringForStorage(theSearchQueryString, theRequestPartitionId);
}
if (searchQueryString == null || searchQueryString.length() > MAX_SEARCH_QUERY_STRING) {
// We want this field to always have a wide distribution of values in order
// to avoid optimizers avoiding using it if it has lots of nulls, so in the
// case of null, just put a value that will never be hit
mySearchQueryString = UUID.randomUUID().toString();
} else {
mySearchQueryString = theSearchQueryString;
mySearchQueryString = searchQueryString;
}
mySearchQueryStringHash = mySearchQueryString.hashCode();
}
@ -310,10 +349,6 @@ public class Search implements ICachedSearchDetails, Serializable {
myLastUpdatedHigh = theUpperBound;
}
public void setSearchQueryStringHash(Integer theSearchQueryStringHash) {
mySearchQueryStringHash = theSearchQueryStringHash;
}
private Set<Include> toIncList(boolean theWantReverse) {
HashSet<Include> retVal = new HashSet<>();
for (SearchInclude next : getIncludes()) {
@ -352,4 +387,13 @@ public class Search implements ICachedSearchDetails, Serializable {
public void setCannotBeReused() {
mySearchQueryStringHash = null;
}
@Nonnull
public static String createSearchQueryStringForStorage(@Nonnull String theSearchQueryString, @Nonnull RequestPartitionId theRequestPartitionId) {
String searchQueryString = theSearchQueryString;
if (!theRequestPartitionId.isAllPartitions()) {
searchQueryString = RequestPartitionId.stringifyForKey(theRequestPartitionId) + " " + searchQueryString;
}
return searchQueryString;
}
}

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.BaseHasResource;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
@ -108,7 +109,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
private final RequestDetails myRequest;
private Search mySearchEntity;
private String myUuid;
private boolean myCacheHit;
private SearchCacheStatusEnum myCacheStatus;
private RequestPartitionId myRequestPartitionId;
/**
@ -299,12 +300,12 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
return myUuid;
}
public boolean isCacheHit() {
return myCacheHit;
public SearchCacheStatusEnum getCacheStatus() {
return myCacheStatus;
}
void setCacheHit() {
myCacheHit = true;
void setCacheStatus(SearchCacheStatusEnum theSearchCacheStatusEnum) {
myCacheStatus = theSearchCacheStatusEnum;
}
@Override

View File

@ -42,6 +42,7 @@ import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.InterceptorUtil;
import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
@ -101,7 +102,6 @@ import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
@ -121,7 +121,7 @@ import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
public static final int DEFAULT_SYNC_SIZE = 250;
public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack";
public static final Integer INTEGER_0 = Integer.valueOf(0);
public static final Integer INTEGER_0 = 0;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
@Autowired
@ -130,7 +130,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private DaoConfig myDaoConfig;
@Autowired
private EntityManager myEntityManager;
private ExecutorService myExecutor;
private final ExecutorService myExecutor;
private Integer myLoadingThrottleForUnitTests = null;
private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
private boolean myNeverUseLocalSearchForUnitTests;
@ -302,7 +302,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@Override
public IBundleProvider registerSearch(final IFhirResourceDao<?> theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails) {
public IBundleProvider registerSearch(final IFhirResourceDao<?> theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
final String searchUuid = UUID.randomUUID().toString();
ourLog.debug("Registering new search {}", searchUuid);
@ -315,29 +315,34 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
return executeQuery(theResourceType, theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo);
return executeQuery(theResourceType, theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
}
/*
* See if there are any cached searches whose results we can return
* instead
*/
boolean useCache = true;
SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
useCache = false;
cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
}
final String queryString = theParams.toNormalizedQueryString(myContext);
if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
if (theParams.getEverythingMode() == null) {
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
IBundleProvider foundSearchProvider = findCachedQuery(theCallingDao, theParams, theResourceType, theRequestDetails, queryString);
if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) {
PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
if (foundSearchProvider != null) {
foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
return foundSearchProvider;
}
}
}
}
return submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString);
PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString, theRequestPartitionId);
retVal.setCacheStatus(cacheStatus);
return retVal;
}
@ -374,10 +379,10 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@NotNull
private IBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString) {
private PersistedJpaSearchFirstPageBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString, RequestPartitionId theRequestPartitionId) {
StopWatch w = new StopWatch();
Search search = new Search();
populateSearchEntity(theParams, theResourceType, theSearchUuid, theQueryString, search);
populateSearchEntity(theParams, theResourceType, theSearchUuid, theQueryString, search, theRequestPartitionId);
// Interceptor call: STORAGE_PRESEARCH_REGISTERED
HookParams params = new HookParams()
@ -386,9 +391,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, theResourceType);
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails, requestPartitionId);
SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId);
myIdToSearchTask.put(search.getUuid(), task);
myExecutor.submit(task);
@ -399,9 +402,11 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
@Nullable
private IBundleProvider findCachedQuery(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString) {
private PersistedJpaBundleProvider findCachedQuery(SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString, RequestPartitionId theRequestPartitionId) {
TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
// May be null
return txTemplate.execute(t -> {
// Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
HookParams params = new HookParams()
@ -414,7 +419,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
}
// Check for a search matching the given hash
Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType);
Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
if (searchToUse == null) {
return null;
}
@ -427,36 +432,20 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
.addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
PersistedJpaBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
retVal.setCacheHit();
return retVal;
return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
});
// May be null
return foundSearchProvider;
}
@Nullable
private Search findSearchToUseOrNull(String theQueryString, String theResourceType) {
Search searchToUse = null;
private Search findSearchToUseOrNull(String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
// createdCutoff is in recent past
final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
Collection<Search> candidates = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, theQueryString.hashCode(), Date.from(createdCutoff));
for (Search nextCandidateSearch : candidates) {
// We should only reuse our search if it was created within the permitted window
// Date.after() is unreliable. Instant.isAfter() always works.
if (theQueryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().toInstant().isAfter(createdCutoff)) {
searchToUse = nextCandidateSearch;
break;
}
}
return searchToUse;
Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
return candidate.orElse(null);
}
private IBundleProvider executeQuery(String theResourceType, SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo) {
private IBundleProvider executeQuery(String theResourceType, SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo, RequestPartitionId theRequestPartitionId) {
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid);
searchRuntimeDetails.setLoadSynchronous(true);
@ -471,8 +460,6 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
// Load the results synchronously
final List<ResourcePersistentId> pids = new ArrayList<>();
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, theResourceType);
Long count = 0L;
if (wantCount) {
ourLog.trace("Performing count");
@ -482,7 +469,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
List<List<IQueryParameterType>> contentAndTerms = theParams.get(Constants.PARAM_CONTENT);
List<List<IQueryParameterType>> textAndTerms = theParams.get(Constants.PARAM_TEXT);
Iterator<Long> countIterator = theSb.createCountQuery(theParams, theSearchUuid, theRequestDetails, requestPartitionId);
Iterator<Long> countIterator = theSb.createCountQuery(theParams, theSearchUuid, theRequestDetails, theRequestPartitionId);
if (contentAndTerms != null) theParams.put(Constants.PARAM_CONTENT, contentAndTerms);
if (textAndTerms != null) theParams.put(Constants.PARAM_TEXT, textAndTerms);
@ -497,7 +484,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
return bundleProvider;
}
try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails, requestPartitionId)) {
try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails, theRequestPartitionId)) {
while (resultIter.hasNext()) {
pids.add(resultIter.next());
if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) {
@ -756,9 +743,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
private ISearchBuilder newSearchBuilder() {
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
return sb;
return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
}
@Nonnull
@ -1241,7 +1226,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
| INTEGER_0.equals(myParams.getCount());
}
public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) {
theSearch.setDeleted(false);
theSearch.setUuid(theSearchUuid);
theSearch.setCreated(new Date());
@ -1252,9 +1237,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
theSearch.setLastUpdated(theParams.getLastUpdated());
theSearch.setResourceType(theResourceType);
theSearch.setStatus(SearchStatusEnum.LOADING);
theSearch.setSearchQueryString(theQueryString);
theSearch.setSearchQueryStringHash(theQueryString.hashCode());
theSearch.setSearchQueryString(theQueryString, theRequestPartitionId);
for (Include next : theParams.getIncludes()) {
theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.search.cache;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
@ -42,6 +43,7 @@ import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import javax.transaction.Transactional;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
@ -140,10 +142,21 @@ public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
}
@Override
public Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter) {
int hashCode = theQueryString.hashCode();
return mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, theCreatedAfter);
public Optional<Search> findCandidatesForReuse(String theResourceType, String theQueryString, Instant theCreatedAfter, RequestPartitionId theRequestPartitionId) {
String queryString = Search.createSearchQueryStringForStorage(theQueryString, theRequestPartitionId);
int hashCode = queryString.hashCode();
Collection<Search> candidates = mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, Date.from(theCreatedAfter));
for (Search nextCandidateSearch : candidates) {
// We should only reuse our search if it was created within the permitted window
// Date.after() is unreliable. Instant.isAfter() always works.
if (queryString.equals(nextCandidateSearch.getSearchQueryString()) && nextCandidateSearch.getCreated().toInstant().isAfter(theCreatedAfter)) {
return Optional.of(nextCandidateSearch);
}
}
return Optional.empty();
}
@Transactional(Transactional.TxType.NEVER)

View File

@ -20,10 +20,10 @@ package ca.uhn.fhir.jpa.search.cache;
* #L%
*/
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.Search;
import java.util.Collection;
import java.util.Date;
import java.time.Instant;
import java.util.Optional;
public interface ISearchCacheSvc {
@ -65,18 +65,18 @@ public interface ISearchCacheSvc {
/**
* Look for any existing searches matching the given resource type and query string.
* <p>
* This method is allowed to perofrm a "best effort" return, so it can return searches that don't match the query string exactly, or
* This method is allowed to perform a "best effort" return, so it can return searches that don't match the query string exactly, or
* which have a created timestamp before <code>theCreatedAfter</code> date. The caller is responsible for removing
* any inappropriate Searches and picking the most relevant one.
* </p>
*
* @param theResourceType The resource type of the search. Results MUST match this type
* @param theQueryString The query string. Results SHOULD match this type
* @param theQueryStringHash The query string hash. Results SHOULD match this type
* @param theCreatedAfter Results SHOULD not include any searches created before this cutoff timestamp
* @param theRequestPartitionId Search should examine only the requested partitions. Cache MUST not return results matching the given partition IDs
* @return A collection of candidate searches
*/
Collection<Search> findCandidatesForReuse(String theResourceType, String theQueryString, int theQueryStringHash, Date theCreatedAfter);
Optional<Search> findCandidatesForReuse(String theResourceType, String theQueryString, Instant theCreatedAfter, RequestPartitionId theRequestPartitionId);
/**
* This method will be called periodically to delete stale searches. Implementations are not required to do anything

View File

@ -0,0 +1,9 @@
package ca.uhn.fhir.jpa.search.cache;
public enum SearchCacheStatusEnum {
NOT_TRIED,
MISS,
HIT
}

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.search.lastn;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.util.CodeSystemHash;
import ca.uhn.fhir.jpa.search.lastn.json.CodeJson;
import ca.uhn.fhir.jpa.search.lastn.json.ObservationJson;
@ -35,6 +36,7 @@ import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.shadehapi.elasticsearch.action.DocWriteResponse;
import org.shadehapi.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.shadehapi.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -69,6 +71,7 @@ import org.shadehapi.elasticsearch.search.aggregations.metrics.tophits.ParsedTop
import org.shadehapi.elasticsearch.search.aggregations.support.ValueType;
import org.shadehapi.elasticsearch.search.builder.SearchSourceBuilder;
import org.shadehapi.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.BufferedReader;
import java.io.IOException;
@ -117,6 +120,8 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
private final RestHighLevelClient myRestHighLevelClient;
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private PartitionSettings myPartitionSettings;
public ElasticsearchSvcImpl(String theHostname, int thePort, String theUsername, String thePassword) {
myRestHighLevelClient = ElasticsearchRestClientFactory.createElasticsearchHighLevelRestClient(theHostname, thePort, theUsername, thePassword);
@ -134,7 +139,7 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
BufferedReader reader = new BufferedReader(input);
StringBuilder sb = new StringBuilder();
String str;
while((str = reader.readLine())!= null){
while ((str = reader.readLine()) != null) {
sb.append(str);
}
@ -178,13 +183,15 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
@Override
public List<String> executeLastN(SearchParameterMap theSearchParameterMap, FhirContext theFhirContext, Integer theMaxResultsToFetch) {
Validate.isTrue(!myPartitionSettings.isPartitioningEnabled(), "$lastn is not currently supported on partitioned servers");
String[] topHitsInclude = {OBSERVATION_IDENTIFIER_FIELD_NAME};
return buildAndExecuteSearch(theSearchParameterMap, theFhirContext, topHitsInclude,
ObservationJson::getIdentifier, theMaxResultsToFetch);
}
private <T> List<T> buildAndExecuteSearch(SearchParameterMap theSearchParameterMap, FhirContext theFhirContext,
String[] topHitsInclude, Function<ObservationJson,T> setValue, Integer theMaxResultsToFetch) {
String[] topHitsInclude, Function<ObservationJson, T> setValue, Integer theMaxResultsToFetch) {
String patientParamName = LastNParameterHelper.getPatientParamName(theFhirContext);
String subjectParamName = LastNParameterHelper.getSubjectParamName(theFhirContext);
List<T> searchResults = new ArrayList<>();
@ -288,7 +295,7 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
return myRestHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
}
private <T> List<T> buildObservationList(SearchResponse theSearchResponse, Function<ObservationJson,T> setValue,
private <T> List<T> buildObservationList(SearchResponse theSearchResponse, Function<ObservationJson, T> setValue,
SearchParameterMap theSearchParameterMap, FhirContext theFhirContext,
Integer theMaxResultsToFetch) throws IOException {
List<T> theObservationList = new ArrayList<>();
@ -350,7 +357,7 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
private List<? extends Terms.Bucket> getObservationCodeBuckets(Aggregations theObservationCodeSystemAggregations) {
List<Terms.Bucket> retVal = new ArrayList<>();
ParsedTerms aggregatedObservationCodeSystems = theObservationCodeSystemAggregations.get(GROUP_BY_SYSTEM);
for(Terms.Bucket observationCodeSystem : aggregatedObservationCodeSystems.getBuckets()) {
for (Terms.Bucket observationCodeSystem : aggregatedObservationCodeSystems.getBuckets()) {
Aggregations observationCodeCodeAggregations = observationCodeSystem.getAggregations();
ParsedTerms aggregatedObservationCodeCodes = observationCodeCodeAggregations.get(GROUP_BY_CODE);
retVal.addAll(aggregatedObservationCodeCodes.getBuckets());
@ -647,7 +654,7 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
@Override
public CodeJson getObservationCodeDocument(String theCodeSystemHash, String theText) {
if(theCodeSystemHash == null && theText == null) {
if (theCodeSystemHash == null && theText == null) {
throw new InvalidRequestException("Require a non-null code system hash value or display value for observation code document query");
}
SearchRequest theSearchRequest = buildSingleObservationCodeSearchRequest(theCodeSystemHash, theText);
@ -687,7 +694,7 @@ public class ElasticsearchSvcImpl implements IElasticsearchSvc {
}
@Override
public Boolean createOrUpdateObservationIndex(String theDocumentId, ObservationJson theObservationDocument){
public Boolean createOrUpdateObservationIndex(String theDocumentId, ObservationJson theObservationDocument) {
try {
String documentToIndex = objectMapper.writeValueAsString(theObservationDocument);
return performIndex(OBSERVATION_INDEX, theDocumentId, documentToIndex, ElasticsearchSvcImpl.OBSERVATION_DOCUMENT_TYPE);

View File

@ -216,12 +216,14 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
/**
* Log all captured SELECT queries
*/
public void logSelectQueries() {
List<String> queries = getSelectQueries()
public List<SqlQuery> logSelectQueries() {
List<SqlQuery> queries = getSelectQueries();
List<String> queriesStrings = queries
.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Select Queries:\n{}", String.join("\n", queries));
ourLog.info("Select Queries:\n{}", String.join("\n", queriesStrings));
return queries;
}
/**

View File

@ -27,6 +27,7 @@ public abstract class BaseJpaR4SystemTest extends BaseJpaR4Test {
patientRp.setDao(myPatientDao);
myServer.setResourceProviders(patientRp);
myServer.init(mock(ServletConfig.class));
myServer.setPagingProvider(myPagingProvider);
}
when(mySrd.getServer()).thenReturn(myServer);

View File

@ -0,0 +1,244 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.HapiExtensions;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.ServletException;
import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;
public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(PartitioningSqlR4Test.class);
static final String PARTITION_1 = "PART-1";
static final String PARTITION_2 = "PART-2";
static final String PARTITION_3 = "PART-3";
static final String PARTITION_4 = "PART-4";
protected MyReadWriteInterceptor myPartitionInterceptor;
protected LocalDate myPartitionDate;
protected LocalDate myPartitionDate2;
protected int myPartitionId;
protected int myPartitionId2;
private boolean myHaveDroppedForcedIdUniqueConstraint;
@Autowired
private IPartitionLookupSvc myPartitionConfigSvc;
@AfterEach
public void after() {
myPartitionInterceptor.assertNoRemainingIds();
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
myPartitionSettings.setAllowReferencesAcrossPartitions(new PartitionSettings().getAllowReferencesAcrossPartitions());
mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof MyReadWriteInterceptor);
myInterceptor = null;
if (myHaveDroppedForcedIdUniqueConstraint) {
runInTransaction(() -> {
myEntityManager.createNativeQuery("delete from HFJ_FORCED_ID").executeUpdate();
myEntityManager.createNativeQuery("alter table HFJ_FORCED_ID add constraint IDX_FORCEDID_TYPE_FID unique (RESOURCE_TYPE, FORCED_ID)");
});
}
myDaoConfig.setIndexMissingFields(new DaoConfig().getIndexMissingFields());
myDaoConfig.setAutoCreatePlaceholderReferenceTargets(new DaoConfig().isAutoCreatePlaceholderReferenceTargets());
}
@BeforeEach
public void before() throws ServletException {
myPartitionSettings.setPartitioningEnabled(true);
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myDaoConfig.setUniqueIndexesEnabled(true);
myModelConfig.setDefaultSearchParamsCanBeOverridden(true);
myPartitionDate = LocalDate.of(2020, Month.JANUARY, 14);
myPartitionDate2 = LocalDate.of(2020, Month.JANUARY, 15);
myPartitionId = 1;
myPartitionId2 = 2;
myPartitionInterceptor = new MyReadWriteInterceptor();
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(3).setName(PARTITION_3));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(4).setName(PARTITION_4));
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
// Ensure the partition names are resolved
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionNames(JpaConstants.DEFAULT_PARTITION_NAME, PARTITION_1, PARTITION_2, PARTITION_3, PARTITION_4));
myPatientDao.search(new SearchParameterMap().setLoadSynchronous(true), mySrd);
}
protected void createUniqueCompositeSp() {
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate");
sp.setType(Enumerations.SearchParamType.DATE);
sp.setCode("birthdate");
sp.setExpression("Patient.birthDate");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate-unique");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
sp.addComponent()
.setExpression("Patient")
.setDefinition("SearchParameter/patient-birthdate");
sp.addExtension()
.setUrl(HapiExtensions.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParamRegistry.forceRefresh();
}
protected void dropForcedIdUniqueConstraint() {
runInTransaction(() -> {
myEntityManager.createNativeQuery("alter table " + ForcedId.HFJ_FORCED_ID + " drop constraint " + ForcedId.IDX_FORCEDID_TYPE_FID).executeUpdate();
});
myHaveDroppedForcedIdUniqueConstraint = true;
}
protected void addCreatePartition(Integer thePartitionId, LocalDate thePartitionDate) {
Validate.notNull(thePartitionId);
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(thePartitionId, thePartitionDate);
myPartitionInterceptor.addCreatePartition(requestPartitionId);
}
protected void addCreateDefaultPartition() {
myPartitionInterceptor.addCreatePartition(RequestPartitionId.defaultPartition());
}
protected void addCreateDefaultPartition(LocalDate thePartitionDate) {
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(null, thePartitionDate);
myPartitionInterceptor.addCreatePartition(requestPartitionId);
}
protected void addReadPartition(Integer... thePartitionId) {
Validate.notNull(thePartitionId);
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionIds(thePartitionId));
}
protected void addReadPartitions(String... thePartitionNames) {
Validate.notNull(thePartitionNames);
Validate.isTrue(thePartitionNames.length > 0);
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionNames(thePartitionNames));
}
protected void addReadDefaultPartition() {
myPartitionInterceptor.addReadPartition(RequestPartitionId.defaultPartition());
}
protected void addReadAllPartitions() {
myPartitionInterceptor.addReadPartition(RequestPartitionId.allPartitions());
}
public void createRequestId() {
when(mySrd.getRequestId()).thenReturn("REQUEST_ID");
}
protected Consumer<IBaseResource> withPartition(Integer thePartitionId) {
return t -> {
if (thePartitionId != null) {
addCreatePartition(thePartitionId, null);
} else {
addCreateDefaultPartition();
}
};
}
protected Consumer<IBaseResource> withPutPartition(Integer thePartitionId) {
return t -> {
if (thePartitionId != null) {
addReadPartition(thePartitionId);
addCreatePartition(thePartitionId, null);
} else {
addReadDefaultPartition();
addCreateDefaultPartition();
}
};
}
@Interceptor
public static class MyReadWriteInterceptor extends MyWriteInterceptor {
private final List<RequestPartitionId> myReadRequestPartitionIds = new ArrayList<>();
public void addReadPartition(RequestPartitionId theRequestPartitionId) {
myReadRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId PartitionIdentifyRead(ServletRequestDetails theRequestDetails) {
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
ourLog.info("Returning partition for read: {}", retVal);
return retVal;
}
@Override
public void assertNoRemainingIds() {
super.assertNoRemainingIds();
assertEquals(0, myReadRequestPartitionIds.size());
}
}
@Interceptor
public static class MyWriteInterceptor {
private final List<RequestPartitionId> myCreateRequestPartitionIds = new ArrayList<>();
public void addCreatePartition(RequestPartitionId theRequestPartitionId) {
myCreateRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId PartitionIdentifyCreate(IBaseResource theResource, ServletRequestDetails theRequestDetails) {
assertNotNull(theResource);
RequestPartitionId retVal = myCreateRequestPartitionIds.remove(0);
ourLog.info("Returning partition for create: {}", retVal);
return retVal;
}
public void assertNoRemainingIds() {
assertEquals(0, myCreateRequestPartitionIds.size());
}
}
}

View File

@ -4,15 +4,14 @@ import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.WarmCacheEntry;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -102,7 +101,7 @@ public class FhirResourceDaoR4CacheWarmingTest extends BaseJpaR4Test {
assertEquals(PersistedJpaBundleProvider.class, result.getClass());
PersistedJpaBundleProvider resultCasted = (PersistedJpaBundleProvider) result;
assertTrue(resultCasted.isCacheHit());
assertEquals(SearchCacheStatusEnum.HIT, resultCasted.getCacheStatus());
}
}

View File

@ -174,7 +174,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
assertThat(ids, hasSize(10));
PersistedJpaBundleProvider bundleProvider = (PersistedJpaBundleProvider) myDatabaseBackedPagingProvider.retrieveResultList(null, uuid);
Integer bundleSize = bundleProvider.size();
assertNotNull(bundleSize, "Null size from provider of type " + bundleProvider.getClass() + " - Cache hit: " + bundleProvider.isCacheHit());
assertNotNull(bundleSize, "Null size from provider of type " + bundleProvider.getClass() + " - Cache hit: " + bundleProvider.getCacheStatus());
assertEquals(201, bundleSize.intValue());
// Search with count only

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
@ -3887,7 +3888,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
runInTransaction(() -> {
Search search = new Search();
SearchCoordinatorSvcImpl.populateSearchEntity(map, "Encounter", uuid, normalized, search);
SearchCoordinatorSvcImpl.populateSearchEntity(map, "Encounter", uuid, normalized, search, RequestPartitionId.allPartitions());
search.setStatus(SearchStatusEnum.FAILED);
search.setFailureCode(500);
search.setFailureMessage("FOO");

View File

@ -0,0 +1,146 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.SqlQuery;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("unchecked")
public class PartitioningSearchCacheR4Test extends BasePartitioningR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(PartitioningSearchCacheR4Test.class);
@Test
public void testSearch_OnePartition_UseCache() {
createPatient(withPartition(null), withActiveTrue());
createPatient(withPartition(null), withActiveFalse());
IIdType patientId11 = createPatient(withPartition(1), withActiveTrue());
IIdType patientId12 = createPatient(withPartition(1), withActiveFalse());
IIdType patientId21 = createPatient(withPartition(2), withActiveTrue());
IIdType patientId22 = createPatient(withPartition(2), withActiveFalse());
{
myCaptureQueriesListener.clear();
addReadPartition(1);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.MISS, outcome.getCacheStatus());
assertEquals(2, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId11, patientId12));
}
// Try from a different partition
{
myCaptureQueriesListener.clear();
addReadPartition(2);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.MISS, outcome.getCacheStatus());
assertEquals(2, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId21, patientId22));
}
// Try from the first partition, should be a cache hit this time
{
myCaptureQueriesListener.clear();
addReadPartition(2);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.HIT, outcome.getCacheStatus());
assertEquals(2, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.logSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId21, patientId22));
}
}
@Test
public void testSearch_MultiplePartitions_UseCache() {
IIdType patientIdNull1 = createPatient(withPartition(null), withActiveTrue());
IIdType patientIdNull2 = createPatient(withPartition(null), withActiveFalse());
IIdType patientId11 = createPatient(withPartition(1), withActiveTrue());
IIdType patientId12 = createPatient(withPartition(1), withActiveFalse());
IIdType patientId21 = createPatient(withPartition(2), withActiveTrue());
IIdType patientId22 = createPatient(withPartition(2), withActiveFalse());
{
myCaptureQueriesListener.clear();
addReadPartition(1, null);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.MISS, outcome.getCacheStatus());
assertEquals(4, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId11, patientId12, patientIdNull1, patientIdNull2));
}
// Try from a different partition
{
myCaptureQueriesListener.clear();
addReadPartition(2, 1);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.MISS, outcome.getCacheStatus());
assertEquals(4, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId11, patientId12, patientId21, patientId22));
}
// Try from the first partition, should be a cache hit this time
{
myCaptureQueriesListener.clear();
addReadPartition(1, null);
PersistedJpaBundleProvider outcome = (PersistedJpaBundleProvider) myPatientDao.search(new SearchParameterMap(), mySrd);
assertEquals(SearchCacheStatusEnum.HIT, outcome.getCacheStatus());
assertEquals(4, outcome.sizeOrThrowNpe());
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueries();
String searchSql = selectQueries.get(0).getSql(true, false);
assertEquals(1, StringUtils.countMatches(searchSql, "from HFJ_SEARCH "), searchSql);
assertEquals(0, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
List<IIdType> ids = toUnqualifiedVersionlessIds(outcome);
assertThat(ids, containsInAnyOrder(patientId11, patientId12, patientIdNull1, patientIdNull2));
}
}
}

View File

@ -1,13 +1,10 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
@ -21,7 +18,6 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTag;
import ca.uhn.fhir.jpa.model.entity.SearchParamPresent;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.rest.api.Constants;
@ -42,14 +38,10 @@ import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.HapiExtensions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.IdType;
@ -59,19 +51,13 @@ import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.PractitionerRole;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static ca.uhn.fhir.jpa.util.TestUtil.sleepAtLeast;
@ -89,76 +75,10 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("unchecked")
public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
static final String PARTITION_1 = "PART-1";
static final String PARTITION_2 = "PART-2";
static final String PARTITION_3 = "PART-3";
static final String PARTITION_4 = "PART-4";
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(PartitioningSqlR4Test.class);
private MyReadWriteInterceptor myPartitionInterceptor;
private LocalDate myPartitionDate;
private LocalDate myPartitionDate2;
private int myPartitionId;
private int myPartitionId2;
private boolean myHaveDroppedForcedIdUniqueConstraint;
@Autowired
private IPartitionLookupSvc myPartitionConfigSvc;
@AfterEach
public void after() {
myPartitionInterceptor.assertNoRemainingIds();
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
myPartitionSettings.setAllowReferencesAcrossPartitions(new PartitionSettings().getAllowReferencesAcrossPartitions());
mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof MyReadWriteInterceptor);
myInterceptor = null;
if (myHaveDroppedForcedIdUniqueConstraint) {
runInTransaction(() -> {
myEntityManager.createNativeQuery("delete from HFJ_FORCED_ID").executeUpdate();
myEntityManager.createNativeQuery("alter table HFJ_FORCED_ID add constraint IDX_FORCEDID_TYPE_FID unique (RESOURCE_TYPE, FORCED_ID)");
});
}
myDaoConfig.setIndexMissingFields(new DaoConfig().getIndexMissingFields());
myDaoConfig.setAutoCreatePlaceholderReferenceTargets(new DaoConfig().isAutoCreatePlaceholderReferenceTargets());
}
@BeforeEach
public void before() throws ServletException {
myPartitionSettings.setPartitioningEnabled(true);
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myDaoConfig.setUniqueIndexesEnabled(true);
myModelConfig.setDefaultSearchParamsCanBeOverridden(true);
myPartitionDate = LocalDate.of(2020, Month.JANUARY, 14);
myPartitionDate2 = LocalDate.of(2020, Month.JANUARY, 15);
myPartitionId = 1;
myPartitionId2 = 2;
myPartitionInterceptor = new MyReadWriteInterceptor();
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(3).setName(PARTITION_3));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(4).setName(PARTITION_4));
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
// Ensure the partition names are resolved
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionNames(JpaConstants.DEFAULT_PARTITION_NAME, PARTITION_1, PARTITION_2, PARTITION_3, PARTITION_4));
myPatientDao.search(new SearchParameterMap().setLoadSynchronous(true), mySrd);
}
@SuppressWarnings({"unchecked", "ConstantConditions"})
public class PartitioningSqlR4Test extends BasePartitioningR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(PartitioningSqlR4Test.class);
@Test
public void testCreateSearchParameter_DefaultPartition() {
@ -173,7 +93,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
runInTransaction(() -> {
ResourceTable resourceTable = myResourceTableDao.findById(id).orElseThrow(IllegalArgumentException::new);
assertEquals(null, resourceTable.getPartitionId());
assertNull(resourceTable.getPartitionId());
});
}
@ -424,7 +344,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
runInTransaction(() -> {
ResourceTable resourceTable = myResourceTableDao.findById(patientId).orElseThrow(IllegalArgumentException::new);
assertEquals(null, resourceTable.getPartitionId());
assertNull(resourceTable.getPartitionId());
});
}
@ -701,7 +621,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
.getRequest().setUrl("Organization").setMethod(Bundle.HTTPVerb.POST);
Patient p = new Patient();
p.getMeta().addTag("http://system", "code", "diisplay");
p.getMeta().addTag("http://system", "code", "display");
p.addName().setFamily("FAM");
p.addIdentifier().setSystem("system").setValue("value");
p.setBirthDate(new Date());
@ -730,7 +650,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
// Create a resource
addCreatePartition(myPartitionId, myPartitionDate);
Patient patient = new Patient();
patient.getMeta().addTag("http://system", "code", "diisplay");
patient.getMeta().addTag("http://system", "code", "display");
patient.setActive(true);
Long patientId = myPatientDao.create(patient, mySrd).getId().getIdPartAsLong();
runInTransaction(() -> {
@ -1182,10 +1102,10 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
myCaptureQueriesListener.clear();
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(1, searchOutcome.size());
IIdType gotId1 = searchOutcome.getResources(0,1).get(0).getIdElement().toUnqualifiedVersionless();
IIdType gotId1 = searchOutcome.getResources(0, 1).get(0).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId1, gotId1);
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, false);
@ -1200,7 +1120,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
{
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1209,7 +1129,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
{
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1234,10 +1154,10 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(1, searchOutcome.size());
IIdType gotId1 = searchOutcome.getResources(0,1).get(0).getIdElement().toUnqualifiedVersionless();
IIdType gotId1 = searchOutcome.getResources(0, 1).get(0).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId1, gotId1);
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, false);
@ -1254,7 +1174,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1265,7 +1185,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1291,10 +1211,10 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
myCaptureQueriesListener.clear();
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(1, searchOutcome.size());
IIdType gotId1 = searchOutcome.getResources(0,1).get(0).getIdElement().toUnqualifiedVersionless();
IIdType gotId1 = searchOutcome.getResources(0, 1).get(0).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId1, gotId1);
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, false).toUpperCase();
@ -1309,7 +1229,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
{
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1318,7 +1238,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
{
addReadPartition(1);
SearchParameterMap map = SearchParameterMap.newSynchronous(Patient.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
SearchParameterMap map = SearchParameterMap.newSynchronous(IAnyResource.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1346,10 +1266,10 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientId1.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(1, searchOutcome.size());
IIdType gotId1 = searchOutcome.getResources(0,1).get(0).getIdElement().toUnqualifiedVersionless();
IIdType gotId1 = searchOutcome.getResources(0, 1).get(0).getIdElement().toUnqualifiedVersionless();
assertEquals(patientId1, gotId1);
// First SQL resolves the forced ID
@ -1371,7 +1291,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientIdNull.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1382,7 +1302,7 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
SearchParameterMap map = SearchParameterMap.newSynchronous()
.add(Patient.SP_ACTIVE, new TokenParam("true"))
.add(Patient.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
.add(IAnyResource.SP_RES_ID, new TokenParam(patientId2.toUnqualifiedVersionless().getValue()));
IBundleProvider searchOutcome = myPatientDao.search(map, mySrd);
assertEquals(0, searchOutcome.size());
}
@ -1390,8 +1310,6 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
}
@Test
public void testSearch_MissingParamString_SearchAllPartitions() {
myPartitionSettings.setIncludePartitionInSearchHashes(false);
@ -2569,10 +2487,10 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
map.setLoadSynchronous(true);
IBundleProvider results = myObservationDao.search(map, mySrd);
List<IIdType> ids = toUnqualifiedVersionlessIds(results);
assertThat(ids, contains(observationId));
String searchSql = myCaptureQueriesListener.getSelectQueriesForCurrentThread().get(0).getSql(true, true);
ourLog.info("Search SQL:\n{}", searchSql);
assertThat(ids, contains(observationId)); // FIXME: move up
assertEquals(1, StringUtils.countMatches(searchSql, "forcedid0_.PARTITION_ID is null"), searchSql);
assertEquals(1, StringUtils.countMatches(searchSql, "forcedid0_.RESOURCE_TYPE='Patient'"), searchSql);
assertEquals(1, StringUtils.countMatches(searchSql, "PARTITION_ID"), searchSql);
@ -2948,147 +2866,5 @@ public class PartitioningSqlR4Test extends BaseJpaR4SystemTest {
}
}
private void createUniqueCompositeSp() {
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate");
sp.setType(Enumerations.SearchParamType.DATE);
sp.setCode("birthdate");
sp.setExpression("Patient.birthDate");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
mySearchParameterDao.update(sp);
sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate-unique");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
sp.addComponent()
.setExpression("Patient")
.setDefinition("SearchParameter/patient-birthdate");
sp.addExtension()
.setUrl(HapiExtensions.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParamRegistry.forceRefresh();
}
private void dropForcedIdUniqueConstraint() {
runInTransaction(() -> {
myEntityManager.createNativeQuery("alter table " + ForcedId.HFJ_FORCED_ID + " drop constraint " + ForcedId.IDX_FORCEDID_TYPE_FID).executeUpdate();
});
myHaveDroppedForcedIdUniqueConstraint = true;
}
private void addCreatePartition(Integer thePartitionId, LocalDate thePartitionDate) {
Validate.notNull(thePartitionId);
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(thePartitionId, thePartitionDate);
myPartitionInterceptor.addCreatePartition(requestPartitionId);
}
private void addCreateDefaultPartition() {
myPartitionInterceptor.addCreatePartition(RequestPartitionId.defaultPartition());
}
private void addCreateDefaultPartition(LocalDate thePartitionDate) {
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(null, thePartitionDate);
myPartitionInterceptor.addCreatePartition(requestPartitionId);
}
private void addReadPartition(Integer... thePartitionId) {
Validate.notNull(thePartitionId);
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionIds(thePartitionId));
}
private void addReadPartitions(String... thePartitionNames) {
Validate.notNull(thePartitionNames);
Validate.isTrue(thePartitionNames.length > 0);
myPartitionInterceptor.addReadPartition(RequestPartitionId.fromPartitionNames(thePartitionNames));
}
private void addReadDefaultPartition() {
myPartitionInterceptor.addReadPartition(RequestPartitionId.defaultPartition());
}
private void addReadAllPartitions() {
myPartitionInterceptor.addReadPartition(RequestPartitionId.allPartitions());
}
public void createRequestId() {
when(mySrd.getRequestId()).thenReturn("REQUEST_ID");
}
private Consumer<IBaseResource> withPartition(Integer thePartitionId) {
return t -> {
if (thePartitionId != null) {
addCreatePartition(thePartitionId, null);
} else {
addCreateDefaultPartition();
}
};
}
private Consumer<IBaseResource> withPutPartition(Integer thePartitionId) {
return t -> {
if (thePartitionId != null) {
addReadPartition(thePartitionId);
addCreatePartition(thePartitionId, null);
} else {
addReadDefaultPartition();
addCreateDefaultPartition();
}
};
}
@Interceptor
public static class MyReadWriteInterceptor extends MyWriteInterceptor {
private final List<RequestPartitionId> myReadRequestPartitionIds = new ArrayList<>();
public void addReadPartition(RequestPartitionId theRequestPartitionId) {
myReadRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId PartitionIdentifyRead(ServletRequestDetails theRequestDetails) {
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
ourLog.info("Returning partition for read: {}", retVal);
return retVal;
}
@Override
public void assertNoRemainingIds() {
super.assertNoRemainingIds();
assertEquals(0, myReadRequestPartitionIds.size());
}
}
@Interceptor
public static class MyWriteInterceptor {
private final List<RequestPartitionId> myCreateRequestPartitionIds = new ArrayList<>();
public void addCreatePartition(RequestPartitionId theRequestPartitionId) {
myCreateRequestPartitionIds.add(theRequestPartitionId);
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId PartitionIdentifyCreate(IBaseResource theResource, ServletRequestDetails theRequestDetails) {
assertNotNull(theResource);
RequestPartitionId retVal = myCreateRequestPartitionIds.remove(0);
ourLog.info("Returning partition for create: {}", retVal);
return retVal;
}
public void assertNoRemainingIds() {
assertEquals(0, myCreateRequestPartitionIds.size());
}
}
}

View File

@ -175,7 +175,7 @@ public class SearchCoordinatorSvcImplTest {
IResultIterator iter = new FailAfterNIterator(new SlowIterator(pids.iterator(), 2), 300);
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(null, result.size());
@ -219,7 +219,7 @@ public class SearchCoordinatorSvcImplTest {
// Do all the stubbing before starting any work, since we want to avoid threading issues
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(null, result.size());
@ -296,7 +296,7 @@ public class SearchCoordinatorSvcImplTest {
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(null, result.size());
@ -337,7 +337,7 @@ public class SearchCoordinatorSvcImplTest {
SlowIterator iter = new SlowIterator(pids.iterator(), 500);
when(mySearchBuilder.createQuery(same(params), any(), any(), nullable(RequestPartitionId.class))).thenReturn(iter);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
CountDownLatch completionLatch = new CountDownLatch(1);
@ -389,7 +389,7 @@ public class SearchCoordinatorSvcImplTest {
});
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(null, result.size());
@ -436,7 +436,7 @@ public class SearchCoordinatorSvcImplTest {
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNotNull(result.getUuid());
assertEquals(90, Objects.requireNonNull(result.size()).intValue());
@ -544,7 +544,7 @@ public class SearchCoordinatorSvcImplTest {
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(790, Objects.requireNonNull(result.size()).intValue());
@ -571,7 +571,7 @@ public class SearchCoordinatorSvcImplTest {
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(any(Collection.class), any(Collection.class), any(List.class), anyBoolean(), any());
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(20, result.size().intValue());
@ -595,7 +595,7 @@ public class SearchCoordinatorSvcImplTest {
pids = createPidSequence(110);
doAnswer(loadPids()).when(mySearchBuilder).loadResourcesByPid(eq(pids), any(Collection.class), any(List.class), anyBoolean(), nullable(RequestDetails.class));
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null);
IBundleProvider result = mySvc.registerSearch(myCallingDao, params, "Patient", new CacheControlDirective(), null, RequestPartitionId.allPartitions());
assertNull(result.getUuid());
assertEquals(100, Objects.requireNonNull(result.size()).intValue());

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.search.lastn.config;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.search.lastn.ElasticsearchSvcImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -21,9 +22,14 @@ public class TestElasticsearchConfig {
private static final String ELASTIC_VERSION = "6.5.4";
@Bean
public PartitionSettings partitionSettings() {
return new PartitionSettings();
}
@Bean()
public ElasticsearchSvcImpl myElasticsearchSvc() {
public ElasticsearchSvcImpl elasticsearchSvc() {
int elasticsearchPort = embeddedElasticSearch().getHttpPort();
return new ElasticsearchSvcImpl(elasticsearchHost, elasticsearchPort, elasticsearchUserId, elasticsearchPassword);
}
@ -49,7 +55,7 @@ public class TestElasticsearchConfig {
@PreDestroy
public void stop() throws IOException {
myElasticsearchSvc().close();
elasticsearchSvc().close();
embeddedElasticSearch().stop();
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.migrate.tasks;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.EmpiLink;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConceptMap;
import ca.uhn.fhir.jpa.entity.TermValueSet;
@ -73,7 +74,7 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
init430(); // Replaced by 5.0.0
init500(); // 20200218 - 20200513
init501(); // 20200514 - 20200515
init510(); // 20200516 - present
init510(); // 20200516 - 20201112
}
protected void init510() {

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.subscription.triggering;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
@ -225,7 +226,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null);
IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null, RequestPartitionId.allPartitions());
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());