Performance enhancements to the JPA server and better retry logic in
$trigger-subscription
This commit is contained in:
parent
aa177c1421
commit
f601b212ad
|
@ -104,8 +104,30 @@ public abstract class BaseConfig implements SchedulingConfigurer {
|
||||||
retVal.put(AvailableSettings.CONNECTION_HANDLING, PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_HOLD);
|
retVal.put(AvailableSettings.CONNECTION_HANDLING, PhysicalConnectionHandlingMode.DELAYED_ACQUISITION_AND_HOLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set some performance options
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (!retVal.containsKey(AvailableSettings.STATEMENT_BATCH_SIZE)) {
|
||||||
|
retVal.put(AvailableSettings.STATEMENT_BATCH_SIZE, "30");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!retVal.containsKey(AvailableSettings.ORDER_INSERTS)) {
|
||||||
|
retVal.put(AvailableSettings.ORDER_INSERTS, "true");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!retVal.containsKey(AvailableSettings.ORDER_UPDATES)) {
|
||||||
|
retVal.put(AvailableSettings.ORDER_UPDATES, "true");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!retVal.containsKey(AvailableSettings.BATCH_VERSIONED_DATA)) {
|
||||||
|
retVal.put(AvailableSettings.BATCH_VERSIONED_DATA, "true");
|
||||||
|
}
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
configureEntityManagerFactory(retVal, fhirContext());
|
configureEntityManagerFactory(retVal, fhirContext());
|
||||||
return retVal;
|
return retVal;
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.hibernate.ScrollableResults;
|
||||||
import org.hibernate.query.Query;
|
import org.hibernate.query.Query;
|
||||||
import org.hibernate.query.criteria.internal.CriteriaBuilderImpl;
|
import org.hibernate.query.criteria.internal.CriteriaBuilderImpl;
|
||||||
import org.hibernate.query.criteria.internal.predicate.BooleanStaticAssertionPredicate;
|
import org.hibernate.query.criteria.internal.predicate.BooleanStaticAssertionPredicate;
|
||||||
import org.hl7.fhir.dstu3.model.BaseResource;
|
|
||||||
import org.hl7.fhir.instance.model.api.IAnyResource;
|
import org.hl7.fhir.instance.model.api.IAnyResource;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
@ -88,7 +87,7 @@ import static org.apache.commons.lang3.StringUtils.*;
|
||||||
@SuppressWarnings("JpaQlInspection")
|
@SuppressWarnings("JpaQlInspection")
|
||||||
public class SearchBuilder implements ISearchBuilder {
|
public class SearchBuilder implements ISearchBuilder {
|
||||||
|
|
||||||
private static final List<Long> EMPTY_LONG_LIST = Collections.unmodifiableList(new ArrayList<Long>());
|
private static final List<Long> EMPTY_LONG_LIST = Collections.unmodifiableList(new ArrayList<>());
|
||||||
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchBuilder.class);
|
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchBuilder.class);
|
||||||
private static Long NO_MORE = -1L;
|
private static Long NO_MORE = -1L;
|
||||||
private static HandlerTypeEnum ourLastHandlerMechanismForUnitTest;
|
private static HandlerTypeEnum ourLastHandlerMechanismForUnitTest;
|
||||||
|
@ -96,7 +95,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private static String ourLastHandlerThreadForUnitTest;
|
private static String ourLastHandlerThreadForUnitTest;
|
||||||
private static boolean ourTrackHandlersForUnitTest;
|
private static boolean ourTrackHandlersForUnitTest;
|
||||||
protected IResourceTagDao myResourceTagDao;
|
protected IResourceTagDao myResourceTagDao;
|
||||||
protected IResourceSearchViewDao myResourceSearchViewDao;
|
private IResourceSearchViewDao myResourceSearchViewDao;
|
||||||
private List<Long> myAlsoIncludePids;
|
private List<Long> myAlsoIncludePids;
|
||||||
private CriteriaBuilder myBuilder;
|
private CriteriaBuilder myBuilder;
|
||||||
private BaseHapiFhirDao<?> myCallingDao;
|
private BaseHapiFhirDao<?> myCallingDao;
|
||||||
|
@ -122,7 +121,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*/
|
*/
|
||||||
public SearchBuilder(FhirContext theFhirContext, EntityManager theEntityManager,
|
SearchBuilder(FhirContext theFhirContext, EntityManager theEntityManager,
|
||||||
IFulltextSearchSvc theFulltextSearchSvc, BaseHapiFhirDao<?> theDao,
|
IFulltextSearchSvc theFulltextSearchSvc, BaseHapiFhirDao<?> theDao,
|
||||||
IResourceIndexedSearchParamUriDao theResourceIndexedSearchParamUriDao, IForcedIdDao theForcedIdDao,
|
IResourceIndexedSearchParamUriDao theResourceIndexedSearchParamUriDao, IForcedIdDao theForcedIdDao,
|
||||||
IHapiTerminologySvc theTerminologySvc, ISearchParamRegistry theSearchParamRegistry,
|
IHapiTerminologySvc theTerminologySvc, ISearchParamRegistry theSearchParamRegistry,
|
||||||
|
@ -175,8 +174,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
|
|
||||||
List<Predicate> codePredicates = new ArrayList<>();
|
List<Predicate> codePredicates = new ArrayList<>();
|
||||||
for (IQueryParameterType nextOr : theList) {
|
for (IQueryParameterType nextOr : theList) {
|
||||||
IQueryParameterType params = nextOr;
|
Predicate p = createPredicateDate(nextOr, theResourceName, theParamName, myBuilder, join);
|
||||||
Predicate p = createPredicateDate(params, theResourceName, theParamName, myBuilder, join);
|
|
||||||
codePredicates.add(p);
|
codePredicates.add(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,6 +214,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
throw new InvalidRequestException("Invalid resource type: " + targetResourceType);
|
throw new InvalidRequestException("Invalid resource type: " + targetResourceType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert parameterName != null;
|
||||||
String paramName = parameterName.replaceAll("\\..*", "");
|
String paramName = parameterName.replaceAll("\\..*", "");
|
||||||
RuntimeSearchParam owningParameterDef = myCallingDao.getSearchParamByName(targetResourceDefinition, paramName);
|
RuntimeSearchParam owningParameterDef = myCallingDao.getSearchParamByName(targetResourceDefinition, paramName);
|
||||||
if (owningParameterDef == null) {
|
if (owningParameterDef == null) {
|
||||||
|
@ -244,7 +243,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private void addPredicateLanguage(List<List<? extends IQueryParameterType>> theList) {
|
private void addPredicateLanguage(List<List<? extends IQueryParameterType>> theList) {
|
||||||
for (List<? extends IQueryParameterType> nextList : theList) {
|
for (List<? extends IQueryParameterType> nextList : theList) {
|
||||||
|
|
||||||
Set<String> values = new HashSet<String>();
|
Set<String> values = new HashSet<>();
|
||||||
for (IQueryParameterType next : nextList) {
|
for (IQueryParameterType next : nextList) {
|
||||||
if (next instanceof StringParam) {
|
if (next instanceof StringParam) {
|
||||||
String nextValue = ((StringParam) next).getValue();
|
String nextValue = ((StringParam) next).getValue();
|
||||||
|
@ -265,7 +264,6 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
myPredicates.add(predicate);
|
myPredicates.add(predicate);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addPredicateNumber(String theResourceName, String theParamName, List<? extends IQueryParameterType> theList) {
|
private void addPredicateNumber(String theResourceName, String theParamName, List<? extends IQueryParameterType> theList) {
|
||||||
|
@ -279,10 +277,9 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
|
|
||||||
List<Predicate> codePredicates = new ArrayList<>();
|
List<Predicate> codePredicates = new ArrayList<>();
|
||||||
for (IQueryParameterType nextOr : theList) {
|
for (IQueryParameterType nextOr : theList) {
|
||||||
IQueryParameterType params = nextOr;
|
|
||||||
|
|
||||||
if (params instanceof NumberParam) {
|
if (nextOr instanceof NumberParam) {
|
||||||
NumberParam param = (NumberParam) params;
|
NumberParam param = (NumberParam) nextOr;
|
||||||
|
|
||||||
BigDecimal value = param.getValue();
|
BigDecimal value = param.getValue();
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
|
@ -293,12 +290,12 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
ParamPrefixEnum prefix = ObjectUtils.defaultIfNull(param.getPrefix(), ParamPrefixEnum.EQUAL);
|
ParamPrefixEnum prefix = ObjectUtils.defaultIfNull(param.getPrefix(), ParamPrefixEnum.EQUAL);
|
||||||
String invalidMessageName = "invalidNumberPrefix";
|
String invalidMessageName = "invalidNumberPrefix";
|
||||||
|
|
||||||
Predicate predicateNumeric = createPredicateNumeric(theResourceName, theParamName, join, myBuilder, params, prefix, value, fromObj, invalidMessageName);
|
Predicate predicateNumeric = createPredicateNumeric(theResourceName, theParamName, join, myBuilder, nextOr, prefix, value, fromObj, invalidMessageName);
|
||||||
Predicate predicateOuter = combineParamIndexPredicateWithParamNamePredicate(theResourceName, theParamName, join, predicateNumeric);
|
Predicate predicateOuter = combineParamIndexPredicateWithParamNamePredicate(theResourceName, theParamName, join, predicateNumeric);
|
||||||
codePredicates.add(predicateOuter);
|
codePredicates.add(predicateOuter);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Invalid token type: " + params.getClass());
|
throw new IllegalArgumentException("Invalid token type: " + nextOr.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -392,7 +389,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
|
|
||||||
final List<Class<? extends IBaseResource>> resourceTypes;
|
final List<Class<? extends IBaseResource>> resourceTypes;
|
||||||
String resourceId;
|
String resourceId;
|
||||||
if (!ref.getValue().matches("[a-zA-Z]+\\/.*")) {
|
if (!ref.getValue().matches("[a-zA-Z]+/.*")) {
|
||||||
|
|
||||||
RuntimeSearchParam param = mySearchParamRegistry.getActiveSearchParam(theResourceName, theParamName);
|
RuntimeSearchParam param = mySearchParamRegistry.getActiveSearchParam(theResourceName, theParamName);
|
||||||
resourceTypes = new ArrayList<>();
|
resourceTypes = new ArrayList<>();
|
||||||
|
@ -904,6 +901,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
case NUMBER:
|
case NUMBER:
|
||||||
case REFERENCE:
|
case REFERENCE:
|
||||||
case URI:
|
case URI:
|
||||||
|
case SPECIAL:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1590,6 +1588,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
sortAttrName = new String[]{"myValue"};
|
sortAttrName = new String[]{"myValue"};
|
||||||
joinType = JoinEnum.QUANTITY;
|
joinType = JoinEnum.QUANTITY;
|
||||||
break;
|
break;
|
||||||
|
case SPECIAL:
|
||||||
case COMPOSITE:
|
case COMPOSITE:
|
||||||
case HAS:
|
case HAS:
|
||||||
default:
|
default:
|
||||||
|
@ -2022,6 +2021,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case HAS:
|
case HAS:
|
||||||
|
case SPECIAL:
|
||||||
// should not happen
|
// should not happen
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2076,6 +2076,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
case REFERENCE:
|
case REFERENCE:
|
||||||
qp = new ReferenceParam();
|
qp = new ReferenceParam();
|
||||||
break;
|
break;
|
||||||
|
case SPECIAL:
|
||||||
case URI:
|
case URI:
|
||||||
case HAS:
|
case HAS:
|
||||||
default:
|
default:
|
||||||
|
@ -2114,7 +2115,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private Long myNext;
|
private Long myNext;
|
||||||
private int myPageSize = myCallingDao.getConfig().getEverythingIncludesFetchPageSize();
|
private int myPageSize = myCallingDao.getConfig().getEverythingIncludesFetchPageSize();
|
||||||
|
|
||||||
public IncludesIterator(Set<Long> thePidSet) {
|
IncludesIterator(Set<Long> thePidSet) {
|
||||||
myCurrentPids = new ArrayList<>(thePidSet);
|
myCurrentPids = new ArrayList<>(thePidSet);
|
||||||
myCurrentIterator = EMPTY_LONG_LIST.iterator();
|
myCurrentIterator = EMPTY_LONG_LIST.iterator();
|
||||||
myCurrentOffset = 0;
|
myCurrentOffset = 0;
|
||||||
|
@ -2128,7 +2129,6 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!myCurrentIterator.hasNext()) {
|
|
||||||
int start = myCurrentOffset;
|
int start = myCurrentOffset;
|
||||||
int end = myCurrentOffset + myPageSize;
|
int end = myCurrentOffset + myPageSize;
|
||||||
if (end > myCurrentPids.size()) {
|
if (end > myCurrentPids.size()) {
|
||||||
|
@ -2143,7 +2143,6 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
Set<Include> includes = Collections.singleton(new Include("*", true));
|
Set<Include> includes = Collections.singleton(new Include("*", true));
|
||||||
Set<Long> newPids = loadIncludes(myCallingDao, myContext, myEntityManager, pidsToScan, includes, false, myParams.getLastUpdated());
|
Set<Long> newPids = loadIncludes(myCallingDao, myContext, myEntityManager, pidsToScan, includes, false, myParams.getLastUpdated());
|
||||||
myCurrentIterator = newPids.iterator();
|
myCurrentIterator = newPids.iterator();
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2151,7 +2150,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
fetchNext();
|
fetchNext();
|
||||||
return myNext != NO_MORE;
|
return !NO_MORE.equals(myNext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2268,7 +2267,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
myFirst = false;
|
myFirst = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (myNext == NO_MORE) {
|
if (NO_MORE.equals(myNext)) {
|
||||||
ourLog.debug("Query found {} matches in {}ms for query {}", myPidSet.size(), myStopwatch.getMillis(), mySearchUuid);
|
ourLog.debug("Query found {} matches in {}ms for query {}", myPidSet.size(), myStopwatch.getMillis(), mySearchUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2279,7 +2278,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
if (myNext == null) {
|
if (myNext == null) {
|
||||||
fetchNext();
|
fetchNext();
|
||||||
}
|
}
|
||||||
return myNext != NO_MORE;
|
return !NO_MORE.equals(myNext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2287,7 +2286,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
fetchNext();
|
fetchNext();
|
||||||
Long retVal = myNext;
|
Long retVal = myNext;
|
||||||
myNext = null;
|
myNext = null;
|
||||||
Validate.isTrue(retVal != NO_MORE, "No more elements");
|
Validate.isTrue(!NO_MORE.equals(retVal), "No more elements");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2301,7 +2300,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private final Set<String> myUniqueQueryStrings;
|
private final Set<String> myUniqueQueryStrings;
|
||||||
private Iterator<Long> myWrap = null;
|
private Iterator<Long> myWrap = null;
|
||||||
|
|
||||||
public UniqueIndexIterator(Set<String> theUniqueQueryStrings) {
|
UniqueIndexIterator(Set<String> theUniqueQueryStrings) {
|
||||||
myUniqueQueryStrings = theUniqueQueryStrings;
|
myUniqueQueryStrings = theUniqueQueryStrings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2343,7 +2342,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private boolean myCountLoaded;
|
private boolean myCountLoaded;
|
||||||
private Long myCount;
|
private Long myCount;
|
||||||
|
|
||||||
public CountQueryIterator(TypedQuery<Long> theQuery) {
|
CountQueryIterator(TypedQuery<Long> theQuery) {
|
||||||
myQuery = theQuery;
|
myQuery = theQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2374,7 +2373,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
private final JoinEnum myJoinType;
|
private final JoinEnum myJoinType;
|
||||||
private final String myParamName;
|
private final String myParamName;
|
||||||
|
|
||||||
public JoinKey(String theParamName, JoinEnum theJoinType) {
|
JoinKey(String theParamName, JoinEnum theJoinType) {
|
||||||
super();
|
super();
|
||||||
myParamName = theParamName;
|
myParamName = theParamName;
|
||||||
myJoinType = theJoinType;
|
myJoinType = theJoinType;
|
||||||
|
@ -2382,6 +2381,9 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object theObj) {
|
public boolean equals(Object theObj) {
|
||||||
|
if (!(theObj instanceof JoinKey)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
JoinKey obj = (JoinKey) theObj;
|
JoinKey obj = (JoinKey) theObj;
|
||||||
return new EqualsBuilder()
|
return new EqualsBuilder()
|
||||||
.append(myParamName, obj.myParamName)
|
.append(myParamName, obj.myParamName)
|
||||||
|
@ -2482,8 +2484,7 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
cq.where(SearchBuilder.toArray(lastUpdatedPredicates));
|
cq.where(SearchBuilder.toArray(lastUpdatedPredicates));
|
||||||
TypedQuery<Long> query = theEntityManager.createQuery(cq);
|
TypedQuery<Long> query = theEntityManager.createQuery(cq);
|
||||||
|
|
||||||
List<Long> resultList = query.getResultList();
|
return query.getResultList();
|
||||||
return resultList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -2504,8 +2505,8 @@ public class SearchBuilder implements ISearchBuilder {
|
||||||
ourTrackHandlersForUnitTest = true;
|
ourTrackHandlersForUnitTest = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static Predicate[] toArray(List<Predicate> thePredicates) {
|
private static Predicate[] toArray(List<Predicate> thePredicates) {
|
||||||
return thePredicates.toArray(new Predicate[thePredicates.size()]);
|
return thePredicates.toArray(new Predicate[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,9 @@ import ca.uhn.fhir.util.StopWatch;
|
||||||
import ca.uhn.fhir.util.ValidateUtil;
|
import ca.uhn.fhir.util.ValidateUtil;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.hl7.fhir.instance.model.IdType;
|
import org.hl7.fhir.instance.model.IdType;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseParameters;
|
import org.hl7.fhir.instance.model.api.IBaseParameters;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
|
@ -62,7 +64,7 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||||
|
@ -84,6 +86,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
@Autowired
|
@Autowired
|
||||||
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
|
||||||
private ApplicationContext myAppCtx;
|
private ApplicationContext myAppCtx;
|
||||||
|
private ExecutorService myExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the maximum number of resources that will be submitted in a single pass
|
* Sets the maximum number of resources that will be submitted in a single pass
|
||||||
|
@ -105,6 +108,37 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
|
Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
|
||||||
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
|
Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
|
||||||
mySubscriptionInterceptorList.addAll(values);
|
mySubscriptionInterceptorList.addAll(values);
|
||||||
|
|
||||||
|
|
||||||
|
LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
|
||||||
|
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
|
||||||
|
.namingPattern("SubscriptionTriggering-%d")
|
||||||
|
.daemon(false)
|
||||||
|
.priority(Thread.NORM_PRIORITY)
|
||||||
|
.build();
|
||||||
|
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
|
||||||
|
@Override
|
||||||
|
public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
|
||||||
|
ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
|
||||||
|
StopWatch sw = new StopWatch();
|
||||||
|
try {
|
||||||
|
executorQueue.put(theRunnable);
|
||||||
|
} catch (InterruptedException theE) {
|
||||||
|
throw new RejectedExecutionException("Task " + theRunnable.toString() +
|
||||||
|
" rejected from " + theE.toString());
|
||||||
|
}
|
||||||
|
ourLog.info("Slot become available after {}ms", sw.getMillis());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
myExecutorService = new ThreadPoolExecutor(
|
||||||
|
0,
|
||||||
|
10,
|
||||||
|
0L,
|
||||||
|
TimeUnit.MILLISECONDS,
|
||||||
|
executorQueue,
|
||||||
|
threadFactory,
|
||||||
|
rejectedExecutionHandler);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
|
||||||
|
@ -227,10 +261,17 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
|
|
||||||
// Submit individual resources
|
// Submit individual resources
|
||||||
int totalSubmitted = 0;
|
int totalSubmitted = 0;
|
||||||
|
List<Pair<String, Future<Void>>> futures = new ArrayList<>();
|
||||||
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
|
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
|
||||||
totalSubmitted++;
|
totalSubmitted++;
|
||||||
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
|
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
|
||||||
submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
|
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
|
||||||
|
futures.add(Pair.of(nextResourceId, future));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure these all succeeded in submitting
|
||||||
|
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we don't have an active search started, and one needs to be.. start it
|
// If we don't have an active search started, and one needs to be.. start it
|
||||||
|
@ -267,14 +308,22 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
||||||
|
|
||||||
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
|
||||||
|
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
|
||||||
|
|
||||||
for (Long next : resourceIds) {
|
for (Long next : resourceIds) {
|
||||||
IBaseResource nextResource = resourceDao.readByPid(next);
|
IBaseResource nextResource = resourceDao.readByPid(next);
|
||||||
submitResource(theJobDetails.getSubscriptionId(), nextResource);
|
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
|
||||||
|
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
|
||||||
totalSubmitted++;
|
totalSubmitted++;
|
||||||
theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1);
|
highestIndexSubmitted++;
|
||||||
}
|
}
|
||||||
|
|
||||||
int expectedCount = toIndex - fromIndex;
|
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
|
||||||
|
|
||||||
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
|
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
|
||||||
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
|
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
|
||||||
theJobDetails.setCurrentSearchResourceType(null);
|
theJobDetails.setCurrentSearchResourceType(null);
|
||||||
|
@ -287,15 +336,34 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
|
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
|
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
|
||||||
|
|
||||||
|
for (Pair<String, Future<Void>> next : theIdToFutures) {
|
||||||
|
String nextDeliveredId = next.getKey();
|
||||||
|
try {
|
||||||
|
Future<Void> nextFuture = next.getValue();
|
||||||
|
nextFuture.get();
|
||||||
|
ourLog.info("Finished redelivering {}", nextDeliveredId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
ourLog.error("Failure triggering resource " + nextDeliveredId, e);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the list since it will potentially get reused
|
||||||
|
theIdToFutures.clear();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
|
||||||
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
|
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
|
||||||
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
|
IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
|
||||||
IBaseResource resourceToTrigger = dao.read(resourceId);
|
IBaseResource resourceToTrigger = dao.read(resourceId);
|
||||||
|
|
||||||
submitResource(theSubscriptionId, resourceToTrigger);
|
return submitResource(theSubscriptionId, resourceToTrigger);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
|
private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
|
||||||
|
|
||||||
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
|
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
|
||||||
|
|
||||||
|
@ -305,9 +373,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
|
||||||
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
|
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
|
||||||
msg.setNewPayload(myFhirContext, theResourceToTrigger);
|
msg.setNewPayload(myFhirContext, theResourceToTrigger);
|
||||||
|
|
||||||
|
return myExecutorService.submit(()->{
|
||||||
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
|
for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
|
||||||
next.submitResourceModified(msg);
|
next.submitResourceModified(msg);
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancelAll() {
|
public void cancelAll() {
|
||||||
|
|
|
@ -34,8 +34,6 @@ public class SubscriptionRestHookInterceptor extends BaseSubscriptionInterceptor
|
||||||
@Override
|
@Override
|
||||||
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
|
protected Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription) {
|
||||||
SubscriptionDeliveringRestHookSubscriber value = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this);
|
SubscriptionDeliveringRestHookSubscriber value = new SubscriptionDeliveringRestHookSubscriber(getSubscriptionDao(), getChannelType(), this);
|
||||||
// FIXME: remove
|
|
||||||
ourLog.info("** Creating delivery subscriber " + value + " for " + theSubscription.getIdElementString());
|
|
||||||
return Optional.of(value);
|
return Optional.of(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ public class TestR4Config extends BaseJavaConfigR4 {
|
||||||
|
|
||||||
DataSource dataSource = ProxyDataSourceBuilder
|
DataSource dataSource = ProxyDataSourceBuilder
|
||||||
.create(retVal)
|
.create(retVal)
|
||||||
// .logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
|
.logQueryBySlf4j(SLF4JLogLevel.INFO, "SQL")
|
||||||
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
|
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS)
|
||||||
.countQuery(new ThreadQueryCountHolder())
|
.countQuery(new ThreadQueryCountHolder())
|
||||||
.build();
|
.build();
|
||||||
|
@ -125,7 +125,6 @@ public class TestR4Config extends BaseJavaConfigR4 {
|
||||||
|
|
||||||
private Properties jpaProperties() {
|
private Properties jpaProperties() {
|
||||||
Properties extraProperties = new Properties();
|
Properties extraProperties = new Properties();
|
||||||
extraProperties.put("hibernate.jdbc.batch_size", "1");
|
|
||||||
extraProperties.put("hibernate.format_sql", "false");
|
extraProperties.put("hibernate.format_sql", "false");
|
||||||
extraProperties.put("hibernate.show_sql", "false");
|
extraProperties.put("hibernate.show_sql", "false");
|
||||||
extraProperties.put("hibernate.hbm2ddl.auto", "update");
|
extraProperties.put("hibernate.hbm2ddl.auto", "update");
|
||||||
|
|
|
@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||||
import ca.uhn.fhir.rest.param.StringParam;
|
import ca.uhn.fhir.rest.param.StringParam;
|
||||||
import ca.uhn.fhir.util.TestUtil;
|
import ca.uhn.fhir.util.TestUtil;
|
||||||
|
import net.ttddyy.dsproxy.listener.ThreadQueryCountHolder;
|
||||||
import org.hl7.fhir.instance.model.api.IIdType;
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
import org.hl7.fhir.r4.model.Bundle;
|
import org.hl7.fhir.r4.model.Bundle;
|
||||||
import org.hl7.fhir.r4.model.IdType;
|
import org.hl7.fhir.r4.model.IdType;
|
||||||
|
@ -109,6 +110,26 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test {
|
||||||
assertThat(output.getEntry().get(1).getResponse().getLocation(), matchesPattern("Patient/[a-z0-9]{8}-.*"));
|
assertThat(output.getEntry().get(1).getResponse().getLocation(), matchesPattern("Patient/[a-z0-9]{8}-.*"));
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWritesPerformMinimalSqlStatements() {
|
||||||
|
Patient p = new Patient();
|
||||||
|
p.addIdentifier().setSystem("sys1").setValue("val1");
|
||||||
|
p.addIdentifier().setSystem("sys2").setValue("val2");
|
||||||
|
|
||||||
|
ourLog.info("** About to perform write");
|
||||||
|
new ThreadQueryCountHolder().getOrCreateQueryCount("").setInsert(0);
|
||||||
|
new ThreadQueryCountHolder().getOrCreateQueryCount("").setUpdate(0);
|
||||||
|
|
||||||
|
myPatientDao.create(p);
|
||||||
|
|
||||||
|
ourLog.info("** Done performing write");
|
||||||
|
|
||||||
|
ourLog.info("Inserts: {}", new ThreadQueryCountHolder().getOrCreateQueryCount("").getInsert());
|
||||||
|
ourLog.info("Updates: {}", new ThreadQueryCountHolder().getOrCreateQueryCount("").getUpdate());
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,17 @@
|
||||||
batch operation. As of this version, when authorizing a transaction operation
|
batch operation. As of this version, when authorizing a transaction operation
|
||||||
(via the transaction() rule), both batch and transaction will be allowed.
|
(via the transaction() rule), both batch and transaction will be allowed.
|
||||||
</action>
|
</action>
|
||||||
|
<action type="add">
|
||||||
|
The JPA server now automatically supplies several appropriate hibernate performance
|
||||||
|
settings as long as the JPA EntityManagerFactory was created using HAPI FHIR's
|
||||||
|
built-in method for creating it.
|
||||||
|
<![CDATA[<br/><br/>]]>
|
||||||
|
Existing JPA projects should consider using
|
||||||
|
<![CDATA[<code>super.entityManagerFactory()</code>]]>
|
||||||
|
as shown in
|
||||||
|
<![CDATA[<a href="https://github.com/hapifhir/hapi-fhir-jpaserver-starter/blob/master/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java#L62">the example project</a>]]>
|
||||||
|
if they are not already.
|
||||||
|
</action>
|
||||||
</release>
|
</release>
|
||||||
|
|
||||||
<release version="3.5.0" date="2018-09-17">
|
<release version="3.5.0" date="2018-09-17">
|
||||||
|
|
Loading…
Reference in New Issue