Speed up subscription triggering (#5377)

* Improve subscription trigger speed

* Speed up subscription triggering

* Add changelogf

* Spotless

* Move changelog
This commit is contained in:
James Agnew 2023-10-20 06:51:35 -04:00 committed by GitHub
parent e2ca967fd9
commit 20c8109a32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 362 additions and 251 deletions

View File

@ -0,0 +1,5 @@
---
type: perf
issue: 5377
jira: SMILE-7545
title: "Subscription triggering via the `$trigger-subscription` operation is now multi-threaded, which significantly improves performance for large data sets."

View File

@ -51,11 +51,11 @@ import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.hl7.fhir.dstu2.model.IdType;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -77,6 +77,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
@ -104,7 +105,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
private JpaStorageSettings myStorageSettings;
@Autowired
private ISearchCoordinatorSvc mySearchCoordinatorSvc;
private ISearchCoordinatorSvc<? extends IResourcePersistentId<?>> mySearchCoordinatorSvc;
@Autowired
private MatchUrlService myMatchUrlService;
@ -240,13 +241,12 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
// Submit individual resources
int totalSubmitted = 0;
List<Pair<String, Future<Void>>> futures = new ArrayList<>();
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
totalSubmitted++;
AtomicInteger totalSubmitted = new AtomicInteger(0);
List<Future<?>> futures = new ArrayList<>();
while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) {
totalSubmitted.incrementAndGet();
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
futures.add(Pair.of(nextResourceId, future));
submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
}
// Make sure these all succeeded in submitting
@ -260,7 +260,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
// to the broker. Note that querying of resource can be done synchronously or asynchronously
if (isInitialStep(theJobDetails)
&& isNotEmpty(theJobDetails.getRemainingSearchUrls())
&& totalSubmitted < myMaxSubmitPerPass) {
&& totalSubmitted.get() < myMaxSubmitPerPass) {
String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
@ -295,144 +295,88 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
}
// processing step for synchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted < myMaxSubmitPerPass) {
List<IBaseResource> allCurrentResources;
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info(
"Triggered job [{}] - Starting synchronous processing at offset {} and index {}",
theJobDetails.getJobId(),
theJobDetails.getCurrentOffset(),
fromIndex);
int submittableCount = myMaxSubmitPerPass - totalSubmitted;
int toIndex = fromIndex + submittableCount;
if (nonNull(search) && !search.isEmpty()) {
// we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
allCurrentResources = search.getResources(0, toIndex);
} else {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
int offset = theJobDetails.getCurrentOffset() + fromIndex;
params.setOffset(offset);
params.setCount(toIndex);
ourLog.info(
"Triggered job[{}] requesting {} resources from offset {}",
theJobDetails.getJobId(),
toIndex,
offset);
search =
mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
allCurrentResources = search.getAllResources();
}
ourLog.info(
"Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
for (IBaseResource nextResource : allCurrentResources) {
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted++;
highestIndexSubmitted++;
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
ourLog.info(
"Triggered job[{}] lastUploadedIndex is {}",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchLastUploadedIndex());
if (allCurrentResources.isEmpty()
|| nonNull(theJobDetails.getCurrentSearchCount())
&& toIndex >= theJobDetails.getCurrentSearchCount()) {
ourLog.info(
"Triggered job[{}] for search URL {} has completed ",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchUrl());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.clearCurrentSearchUrl();
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
/*
* Processing step for synchronous processing mode - This is only called if the
* server is configured to force offset searches, ie using ForceSynchronousSearchInterceptor.
* Otherwise, we'll always do async mode.
*/
if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) {
processSynchronous(theJobDetails, totalSubmitted, futures, search);
}
// processing step for asynchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) {
processAsynchronous(theJobDetails, totalSubmitted, futures);
}
IFhirResourceDao<?> resourceDao =
myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
ourLog.info(
"Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)",
theJobDetails.getJobId(),
totalSubmitted,
sw.getMillis(),
sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS));
}
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
int toIndex;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
} else {
toIndex = fromIndex + maxQuerySize;
}
private void processAsynchronous(
SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
ourLog.info(
"Triggering job[{}] search {} requesting resources {} - {}",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchUuid(),
fromIndex,
toIndex);
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
List<IResourcePersistentId<?>> resourceIds;
RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions();
resourceIds = mySearchCoordinatorSvc.getResources(
theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
} else {
toIndex = fromIndex + maxQuerySize;
}
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
ourLog.info(
"Triggering job[{}] search {} requesting resources {} - {}",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchUuid(),
fromIndex,
toIndex);
String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
RuntimeResourceDefinition resourceDef =
myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(
resourceDao, resourceType, resourceDef.getImplementingClass());
List<IBaseResource> listToPopulate = new ArrayList<>();
List<? extends IResourcePersistentId<?>> allResourceIds;
RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions();
allResourceIds = mySearchCoordinatorSvc.getResources(
theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
myTransactionService.withSystemRequest().execute(() -> {
searchBuilder.loadResourcesByPid(
resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
});
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
for (IBaseResource nextResource : listToPopulate) {
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted++;
highestIndexSubmitted++;
}
List<? extends List<? extends IResourcePersistentId<?>>> partitions = Lists.partition(allResourceIds, 100);
for (List<? extends IResourcePersistentId<?>> resourceIds : partitions) {
Runnable job = () -> {
String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
RuntimeResourceDefinition resourceDef =
myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(
resourceDao, resourceType, resourceDef.getImplementingClass());
List<IBaseResource> listToPopulate = new ArrayList<>();
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
myTransactionService.withRequest(null).execute(() -> {
searchBuilder.loadResourcesByPid(
resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
});
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
for (IBaseResource nextResource : listToPopulate) {
submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
};
if (resourceIds.size() == 0
Future<?> future = myExecutorService.submit(job);
futures.add(future);
}
if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
if (allResourceIds.isEmpty()
|| (theJobDetails.getCurrentSearchCount() != null
&& toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info(
@ -445,13 +389,93 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
theJobDetails.setCurrentSearchCount(null);
}
}
}
private void processSynchronous(
SubscriptionTriggeringJobDetails theJobDetails,
AtomicInteger totalSubmitted,
List<Future<?>> futures,
IBundleProvider search) {
List<IBaseResource> allCurrentResources;
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info(
"Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)",
"Triggered job [{}] - Starting synchronous processing at offset {} and index {}",
theJobDetails.getJobId(),
totalSubmitted,
sw.getMillis(),
sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
theJobDetails.getCurrentOffset(),
fromIndex);
int submittableCount = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex = fromIndex + submittableCount;
if (nonNull(search) && !search.isEmpty()) {
if (search.getCurrentPageSize() != null) {
toIndex = search.getCurrentPageSize();
}
// we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
allCurrentResources = search.getResources(0, toIndex);
} else {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
int offset = theJobDetails.getCurrentOffset() + fromIndex;
params.setOffset(offset);
params.setCount(toIndex);
ourLog.info(
"Triggered job[{}] requesting {} resources from offset {}",
theJobDetails.getJobId(),
toIndex,
offset);
search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
allCurrentResources = search.getResources(0, submittableCount);
}
ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
for (IBaseResource nextResource : allCurrentResources) {
Future<?> future =
myExecutorService.submit(() -> submitResource(theJobDetails.getSubscriptionId(), nextResource));
futures.add(future);
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
ourLog.info(
"Triggered job[{}] lastUploadedIndex is {}",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchLastUploadedIndex());
if (allCurrentResources.isEmpty()
|| nonNull(theJobDetails.getCurrentSearchCount())
&& toIndex > theJobDetails.getCurrentSearchCount()) {
ourLog.info(
"Triggered job[{}] for search URL {} has completed ",
theJobDetails.getJobId(),
theJobDetails.getCurrentSearchUrl());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.clearCurrentSearchUrl();
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
}
private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
@ -462,34 +486,31 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return isInitialStep(theJobDetails);
}
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) {
for (Pair<String, Future<Void>> next : theIdToFutures) {
String nextDeliveredId = next.getKey();
for (Future<?> nextFuture : theFutures) {
try {
Future<Void> nextFuture = next.getValue();
nextFuture.get();
ourLog.info("Finished redelivering {}", nextDeliveredId);
} catch (Exception e) {
ourLog.error("Failure triggering resource " + nextDeliveredId, e);
ourLog.error("Failure triggering resource", e);
return true;
}
}
// Clear the list since it will potentially get reused
theIdToFutures.clear();
theFutures.clear();
return false;
}
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions());
return submitResource(theSubscriptionId, resourceToTrigger);
submitResource(theSubscriptionId, resourceToTrigger);
}
private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
ourLog.info(
"Submitting resource {} to subscription {}",
@ -500,24 +521,23 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(theSubscriptionId);
return myExecutorService.submit(() -> {
for (int i = 0; ; i++) {
try {
myResourceModifiedConsumer.submitResourceModified(msg);
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(Msg.code(25) + e);
}
for (int i = 0; ; i++) {
try {
myResourceModifiedConsumer.submitResourceModified(msg);
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(Msg.code(25) + e);
}
ourLog.warn(
"Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return null;
});
}
}
public void cancelAll() {

View File

@ -44,6 +44,8 @@ import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.http.HttpServletRequest;
@ -499,14 +501,18 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
myInterceptorService.unregisterInterceptor(forceSynchronousSearchInterceptor);
}
@Test
public void testTriggerSubscriptionInSynchronousQueryMode() throws Exception {
((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(10);
@ParameterizedTest
@CsvSource({
"10",
"10000"
})
public void testTriggerSubscriptionInSynchronousQueryMode(int theMaxSubmitPerpass) throws Exception {
((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(theMaxSubmitPerpass);
String payload = "application/fhir+json";
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
int numberOfPatient = 15;
int numberOfPatient = 200;
// Create lots
createPatientsAndWait(numberOfPatient);
@ -522,9 +528,9 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
.withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?"))
.execute();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
for (int i = 0; i < 20; i++) {
mySubscriptionTriggeringSvc.runDeliveryPass();
}
waitForSize(0, ourCreatedPatients);
waitForSize(numberOfPatient, ourUpdatedPatients);

View File

@ -19,6 +19,7 @@ import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
import ca.uhn.fhir.jpa.entity.TermValueSet;
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
@ -27,7 +28,9 @@ import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.jpa.term.TermReadSvcImpl;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.util.SqlQuery;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
@ -40,9 +43,12 @@ import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.interceptor.auth.AuthorizationInterceptor;
import ca.uhn.fhir.rest.server.interceptor.auth.PolicyEnum;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.test.utilities.ProxyUtil;
import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.util.BundleBuilder;
import org.hamcrest.CoreMatchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
@ -94,12 +100,16 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static ca.uhn.fhir.jpa.subscription.FhirR4Util.createSubscription;
import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -145,6 +155,9 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
private ReindexStep myReindexStep;
@Autowired
private DeleteExpungeStep myDeleteExpungeStep;
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@AfterEach
public void afterResetDao() {
@ -167,6 +180,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
myFhirContext.getParserOptions().setStripVersionsFromReferences(true);
TermReadSvcImpl.setForceDisableHibernateSearchForUnitTest(false);
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
}
@Override
@ -3073,8 +3088,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
Bundle input = bb.getBundleTyped();
// input.getEntry().get(0).
myCaptureQueriesListener.clear();
mySystemDao.transaction(mySrd, input);
assertEquals(1, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
@ -3087,37 +3100,102 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
*/
@SuppressWarnings("unchecked")
@Test
public void testTriggerSubscription() throws Exception {
public void testTriggerSubscription_Sync() throws Exception {
// Setup
IntStream.range(0, 200).forEach(i->createAPatient());
myStorageSettings.addSupportedSubscriptionType(org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK);
myResourceModifiedSubmitterSvc.startIfNeeded();
mySubscriptionTestUtil.registerRestHookInterceptor();
ForceOffsetSearchModeInterceptor interceptor = new ForceOffsetSearchModeInterceptor();
myInterceptorRegistry.registerInterceptor(interceptor);
try {
String payload = "application/fhir+json";
Subscription subscription = createSubscription("Patient?", payload, ourServer.getBaseUrl(), null);
IIdType subscriptionId = mySubscriptionDao.create(subscription, mySrd).getId();
for (int i = 0; i < 10; i++) {
createPatient(withActiveTrue());
waitForActivatedSubscriptionCount(1);
mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?")), subscriptionId);
// Test
myCaptureQueriesListener.clear();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
mySubscriptionTriggeringSvc.runDeliveryPass();
myCaptureQueriesListener.logSelectQueries();
ourPatientProvider.waitForUpdateCount(200);
// Validate
assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
} finally {
myInterceptorRegistry.unregisterInterceptor(interceptor);
}
}
@Test
public void testTriggerSubscription_Async() throws Exception {
// Setup
IntStream.range(0, 200).forEach(i->createAPatient());
mySubscriptionTestUtil.registerRestHookInterceptor();
String payload = "application/fhir+json";
Subscription subscription = createSubscription("Patient?", payload, ourServer.getBaseUrl(), null);
IIdType subId = mySubscriptionDao.create(subscription, mySrd).getId();
Subscription subscription = new Subscription();
subscription.getChannel().setEndpoint(ourServer.getBaseUrl());
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setPayload(Constants.CT_FHIR_JSON_NEW);
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria("Patient?active=true");
IIdType subscriptionId = mySubscriptionDao.create(subscription, mySrd).getId().toUnqualifiedVersionless();
waitForActivatedSubscriptionCount(1);
mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?active=true")), subscriptionId);
// Test
myCaptureQueriesListener.clear();
mySubscriptionTriggeringSvc.runDeliveryPass();
ourPatientProvider.waitForUpdateCount(10);
Parameters response = myClient
.operation()
.onInstance(subId)
.named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
.withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?"))
.execute();
String responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, CoreMatchers.containsString("Subscription triggering job submitted as JOB ID"));
// Validate
assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(11, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(3, myCaptureQueriesListener.countSelectQueries());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
myCaptureQueriesListener.clear();
mySubscriptionTriggeringSvc.runDeliveryPass();
myCaptureQueriesListener.logInsertQueries();
assertEquals(15, myCaptureQueriesListener.countSelectQueries());
assertEquals(201, myCaptureQueriesListener.countInsertQueries());
assertEquals(3, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
myCaptureQueriesListener.clear();
mySubscriptionTriggeringSvc.runDeliveryPass();
assertEquals(2, myCaptureQueriesListener.countSelectQueries());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
myCaptureQueriesListener.clear();
mySubscriptionTriggeringSvc.runDeliveryPass();
assertEquals(0, myCaptureQueriesListener.countSelectQueries());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
SubscriptionTriggeringSvcImpl svc = ProxyUtil.getSingletonTarget(mySubscriptionTriggeringSvc, SubscriptionTriggeringSvcImpl.class);
assertEquals(0, svc.getActiveJobCount());
assertEquals(0, ourPatientProvider.getCountCreate());
await().until(ourPatientProvider::getCountUpdate, equalTo(200L));
}

View File

@ -12,12 +12,14 @@ import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import javax.annotation.Nullable;
public class FhirR4Util {
public static final String LPI_CODESYSTEM = "http://cognitivemedicine.com/lpi";
public static final String LPI_CODE = "LPI-FHIR";
public static Subscription createSubscription(String criteria, String payload, String endpoint, IGenericClient client) {
public static Subscription createSubscription(String criteria, String payload, String endpoint, @Nullable IGenericClient client) {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
@ -29,8 +31,10 @@ public class FhirR4Util {
channel.setEndpoint(endpoint);
subscription.setChannel(channel);
MethodOutcome methodOutcome = client.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
if (client != null) {
MethodOutcome methodOutcome = client.create().resource(subscription).execute();
subscription.setId(methodOutcome.getId().getIdPart());
}
return subscription;
}

View File

@ -79,7 +79,8 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
}
@BeforeEach
@Override
@BeforeEach
public void beforeRegisterRestHookListener() {
mySubscriptionTestUtil.registerMessageInterceptor();

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatcher;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
import ca.uhn.fhir.model.primitive.IdDt;

View File

@ -208,40 +208,31 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured UPDATE queries
*/
public String logUpdateQueriesForCurrentThread() {
List<String> queries = getUpdateQueriesForCurrentThread().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
String joined = String.join("\n", queries);
List<SqlQuery> queries = getUpdateQueriesForCurrentThread();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
String joined = String.join("\n", queriesStrings);
ourLog.info("Update Queries:\n{}", joined);
return joined;
}
/**
* Log all captured SELECT queries
*
* @return
*/
public String logSelectQueriesForCurrentThread(int... theIndexes) {
List<String> queries = getSelectQueriesForCurrentThread().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
List<SqlQuery> queries = getSelectQueriesForCurrentThread();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
List<String> newList = new ArrayList<>();
if (theIndexes != null && theIndexes.length > 0) {
for (int i = 0; i < theIndexes.length; i++) {
int index = theIndexes[i];
newList.add("[" + index + "] " + queries.get(index));
}
} else {
for (int i = 0; i < queries.size(); i++) {
newList.add("[" + i + "] " + queries.get(i));
for (int index : theIndexes) {
newList.add(queriesStrings.get(index));
}
queriesStrings = newList;
}
queries = newList;
String queriesAsString = String.join("\n", queries);
ourLog.info("Select Queries:\n{}", queriesAsString);
return queriesAsString;
String joined = String.join("\n", queriesStrings);
ourLog.info("Select Queries:\n{}", joined);
return joined;
}
/**
@ -256,20 +247,32 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
*/
public List<SqlQuery> logSelectQueries(boolean theInlineParams, boolean theFormatSql) {
List<SqlQuery> queries = getSelectQueries();
List<String> queriesStrings = queries.stream()
.map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, theInlineParams, theFormatSql))
.collect(Collectors.toList());
List<String> queriesStrings = renderQueriesForLogging(theInlineParams, theFormatSql, queries);
ourLog.info("Select Queries:\n{}", String.join("\n", queriesStrings));
return queries;
}
@Nonnull
private static List<String> renderQueriesForLogging(
boolean theInlineParams, boolean theFormatSql, List<SqlQuery> queries) {
List<String> queriesStrings = new ArrayList<>();
for (int i = 0; i < queries.size(); i++) {
SqlQuery query = queries.get(i);
String remderedString = "[" + i + "] "
+ CircularQueueCaptureQueriesListener.formatQueryAsSql(query, theInlineParams, theFormatSql);
queriesStrings.add(remderedString);
}
return queriesStrings;
}
/**
* Log first captured SELECT query
*/
public void logFirstSelectQueryForCurrentThread() {
boolean inlineParams = true;
String firstSelectQuery = getSelectQueriesForCurrentThread().stream()
.findFirst()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, inlineParams, inlineParams))
.orElse("NONE FOUND");
ourLog.info("First select SqlQuery:\n{}", firstSelectQuery);
}
@ -278,10 +281,9 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured INSERT queries
*/
public String logInsertQueriesForCurrentThread() {
List<String> queries = getInsertQueriesForCurrentThread().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
String queriesAsString = String.join("\n", queries);
List<SqlQuery> queries = getInsertQueriesForCurrentThread();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
String queriesAsString = String.join("\n", queriesStrings);
ourLog.info("Insert Queries:\n{}", queriesAsString);
return queriesAsString;
}
@ -290,20 +292,18 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured queries
*/
public void logAllQueriesForCurrentThread() {
List<String> queries = getAllQueriesForCurrentThread().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Queries:\n{}", String.join("\n", queries));
List<SqlQuery> queries = getAllQueriesForCurrentThread();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
ourLog.info("Queries:\n{}", String.join("\n", queriesStrings));
}
/**
* Log all captured queries
*/
public void logAllQueries() {
List<String> queries = getCapturedQueries().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Queries:\n{}", String.join("\n", queries));
List<SqlQuery> queries = getCapturedQueries();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
ourLog.info("Queries:\n{}", String.join("\n", queriesStrings));
}
/**
@ -317,10 +317,12 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured INSERT queries
*/
public int logInsertQueries(Predicate<SqlQuery> theInclusionPredicate) {
List<SqlQuery> insertQueries = getInsertQueries();
List<String> queries = insertQueries.stream()
List<SqlQuery> insertQueries = getInsertQueries().stream()
.filter(t -> theInclusionPredicate == null || theInclusionPredicate.test(t))
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
boolean inlineParams = true;
List<String> queries = insertQueries.stream()
.map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, inlineParams, inlineParams))
.collect(Collectors.toList());
ourLog.info("Insert Queries:\n{}", String.join("\n", queries));
@ -331,23 +333,20 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured INSERT queries
*/
public int logUpdateQueries() {
List<SqlQuery> updateQueries = getUpdateQueries();
List<String> queries = updateQueries.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Update Queries:\n{}", String.join("\n", queries));
List<SqlQuery> queries = getUpdateQueries();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
ourLog.info("Update Queries:\n{}", String.join("\n", queriesStrings));
return countQueries(updateQueries);
return countQueries(queries);
}
/**
* Log all captured DELETE queries
*/
public String logDeleteQueriesForCurrentThread() {
List<String> queries = getDeleteQueriesForCurrentThread().stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
String joined = String.join("\n", queries);
List<SqlQuery> queries = getDeleteQueriesForCurrentThread();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
String joined = String.join("\n", queriesStrings);
ourLog.info("Delete Queries:\n{}", joined);
return joined;
}
@ -356,13 +355,11 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
* Log all captured DELETE queries
*/
public int logDeleteQueries() {
List<SqlQuery> deleteQueries = getDeleteQueries();
List<String> queries = deleteQueries.stream()
.map(CircularQueueCaptureQueriesListener::formatQueryAsSql)
.collect(Collectors.toList());
ourLog.info("Delete Queries:\n{}", String.join("\n", queries));
List<SqlQuery> queries = getDeleteQueries();
List<String> queriesStrings = renderQueriesForLogging(true, true, queries);
ourLog.info("Delete Queries:\n{}", String.join("\n", queriesStrings));
return countQueries(deleteQueries);
return countQueries(queries);
}
public int countSelectQueries() {

View File

@ -5,7 +5,6 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.persistence.EntityManager;
import org.springframework.transaction.PlatformTransactionManager;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;