From 20c8109a329a158f3b57f466ea7e764d3871cd53 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Fri, 20 Oct 2023 06:51:35 -0400 Subject: [PATCH] Speed up subscription triggering (#5377) * Improve subscription trigger speed * Speed up subscription triggering * Add changelogf * Spotless * Move changelog --- ...improve-subscription-triggering-speed.yaml | 5 + .../SubscriptionTriggeringSvcImpl.java | 342 +++++++++--------- .../SubscriptionTriggeringDstu3Test.java | 20 +- .../r4/FhirResourceDaoR4QueryCountTest.java | 124 +++++-- .../uhn/fhir/jpa/subscription/FhirR4Util.java | 10 +- .../message/MessageSubscriptionR4Test.java | 3 +- .../resthook/RestHookTestR4Test.java | 1 + .../CircularQueueCaptureQueriesListener.java | 107 +++--- .../resources/vm/jpa_spring_beans_java.vm | 1 - 9 files changed, 362 insertions(+), 251 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5377-improve-subscription-triggering-speed.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5377-improve-subscription-triggering-speed.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5377-improve-subscription-triggering-speed.yaml new file mode 100644 index 00000000000..1ecda6db9a0 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_10_0/5377-improve-subscription-triggering-speed.yaml @@ -0,0 +1,5 @@ +--- +type: perf +issue: 5377 +jira: SMILE-7545 +title: "Subscription triggering via the `$trigger-subscription` operation is now multi-threaded, which significantly improves performance for large data sets." diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java index 2cbd1c0d291..55563d08c71 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java @@ -51,11 +51,11 @@ import ca.uhn.fhir.util.ParametersUtil; import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.ValidateUtil; +import com.google.common.collect.Lists; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.time.DateUtils; -import org.apache.commons.lang3.tuple.Pair; import org.hl7.fhir.dstu2.model.IdType; import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -77,6 +77,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.PostConstruct; @@ -104,7 +105,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc private JpaStorageSettings myStorageSettings; @Autowired - private ISearchCoordinatorSvc mySearchCoordinatorSvc; + private ISearchCoordinatorSvc> mySearchCoordinatorSvc; @Autowired private MatchUrlService myMatchUrlService; @@ -240,13 +241,12 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); // Submit individual resources - int totalSubmitted = 0; - List>> futures = new ArrayList<>(); - while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { - totalSubmitted++; + AtomicInteger totalSubmitted = new AtomicInteger(0); + List> futures = new ArrayList<>(); + while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) { + totalSubmitted.incrementAndGet(); String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId); - futures.add(Pair.of(nextResourceId, future)); + submitResource(theJobDetails.getSubscriptionId(), nextResourceId); } // Make sure these all succeeded in submitting @@ -260,7 +260,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc // to the broker. Note that querying of resource can be done synchronously or asynchronously if (isInitialStep(theJobDetails) && isNotEmpty(theJobDetails.getRemainingSearchUrls()) - && totalSubmitted < myMaxSubmitPerPass) { + && totalSubmitted.get() < myMaxSubmitPerPass) { String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl); @@ -295,144 +295,88 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc theJobDetails.setCurrentSearchLastUploadedIndex(-1); } - // processing step for synchronous processing mode - if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted < myMaxSubmitPerPass) { - List allCurrentResources; - - int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - - String searchUrl = theJobDetails.getCurrentSearchUrl(); - - ourLog.info( - "Triggered job [{}] - Starting synchronous processing at offset {} and index {}", - theJobDetails.getJobId(), - theJobDetails.getCurrentOffset(), - fromIndex); - - int submittableCount = myMaxSubmitPerPass - totalSubmitted; - int toIndex = fromIndex + submittableCount; - - if (nonNull(search) && !search.isEmpty()) { - - // we already have data from the initial step so process as much as we can. - ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); - allCurrentResources = search.getResources(0, toIndex); - - } else { - if (theJobDetails.getCurrentSearchCount() != null) { - toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); - } - - RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); - String queryPart = searchUrl.substring(searchUrl.indexOf('?')); - SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); - int offset = theJobDetails.getCurrentOffset() + fromIndex; - params.setOffset(offset); - params.setCount(toIndex); - - ourLog.info( - "Triggered job[{}] requesting {} resources from offset {}", - theJobDetails.getJobId(), - toIndex, - offset); - - search = - mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); - allCurrentResources = search.getAllResources(); - } - - ourLog.info( - "Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); - int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex(); - - for (IBaseResource nextResource : allCurrentResources) { - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResource); - futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); - totalSubmitted++; - highestIndexSubmitted++; - } - - if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { - return; - } - - theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted); - - ourLog.info( - "Triggered job[{}] lastUploadedIndex is {}", - theJobDetails.getJobId(), - theJobDetails.getCurrentSearchLastUploadedIndex()); - - if (allCurrentResources.isEmpty() - || nonNull(theJobDetails.getCurrentSearchCount()) - && toIndex >= theJobDetails.getCurrentSearchCount()) { - ourLog.info( - "Triggered job[{}] for search URL {} has completed ", - theJobDetails.getJobId(), - theJobDetails.getCurrentSearchUrl()); - theJobDetails.setCurrentSearchResourceType(null); - theJobDetails.clearCurrentSearchUrl(); - theJobDetails.setCurrentSearchLastUploadedIndex(-1); - theJobDetails.setCurrentSearchCount(null); - } + /* + * Processing step for synchronous processing mode - This is only called if the + * server is configured to force offset searches, ie using ForceSynchronousSearchInterceptor. + * Otherwise, we'll always do async mode. + */ + if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) { + processSynchronous(theJobDetails, totalSubmitted, futures, search); } // processing step for asynchronous processing mode - if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { - int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) { + processAsynchronous(theJobDetails, totalSubmitted, futures); + } - IFhirResourceDao resourceDao = - myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); + ourLog.info( + "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", + theJobDetails.getJobId(), + totalSubmitted, + sw.getMillis(), + sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS)); + } - int maxQuerySize = myMaxSubmitPerPass - totalSubmitted; - int toIndex; - if (theJobDetails.getCurrentSearchCount() != null) { - toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); - } else { - toIndex = fromIndex + maxQuerySize; - } + private void processAsynchronous( + SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List> futures) { + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - ourLog.info( - "Triggering job[{}] search {} requesting resources {} - {}", - theJobDetails.getJobId(), - theJobDetails.getCurrentSearchUuid(), - fromIndex, - toIndex); + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); - List> resourceIds; - RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions(); - resourceIds = mySearchCoordinatorSvc.getResources( - theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); + int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get(); + int toIndex; + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); + } else { + toIndex = fromIndex + maxQuerySize; + } - ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size()); - int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex(); + ourLog.info( + "Triggering job[{}] search {} requesting resources {} - {}", + theJobDetails.getJobId(), + theJobDetails.getCurrentSearchUuid(), + fromIndex, + toIndex); - String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); - RuntimeResourceDefinition resourceDef = - myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); - ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder( - resourceDao, resourceType, resourceDef.getImplementingClass()); - List listToPopulate = new ArrayList<>(); + List> allResourceIds; + RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions(); + allResourceIds = mySearchCoordinatorSvc.getResources( + theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); - myTransactionService.withSystemRequest().execute(() -> { - searchBuilder.loadResourcesByPid( - resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); - }); + ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size()); + AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); - for (IBaseResource nextResource : listToPopulate) { - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResource); - futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); - totalSubmitted++; - highestIndexSubmitted++; - } + List>> partitions = Lists.partition(allResourceIds, 100); + for (List> resourceIds : partitions) { + Runnable job = () -> { + String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); + RuntimeResourceDefinition resourceDef = + myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); + ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder( + resourceDao, resourceType, resourceDef.getImplementingClass()); + List listToPopulate = new ArrayList<>(); - if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { - return; - } + myTransactionService.withRequest(null).execute(() -> { + searchBuilder.loadResourcesByPid( + resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); + }); - theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted); + for (IBaseResource nextResource : listToPopulate) { + submitResource(theJobDetails.getSubscriptionId(), nextResource); + totalSubmitted.incrementAndGet(); + highestIndexSubmitted.incrementAndGet(); + } + }; - if (resourceIds.size() == 0 + Future future = myExecutorService.submit(job); + futures.add(future); + } + + if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { + + theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); + + if (allResourceIds.isEmpty() || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { ourLog.info( @@ -445,13 +389,93 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc theJobDetails.setCurrentSearchCount(null); } } + } + + private void processSynchronous( + SubscriptionTriggeringJobDetails theJobDetails, + AtomicInteger totalSubmitted, + List> futures, + IBundleProvider search) { + List allCurrentResources; + + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + + String searchUrl = theJobDetails.getCurrentSearchUrl(); ourLog.info( - "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", + "Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), - totalSubmitted, - sw.getMillis(), - sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); + theJobDetails.getCurrentOffset(), + fromIndex); + + int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); + int toIndex = fromIndex + submittableCount; + + if (nonNull(search) && !search.isEmpty()) { + + if (search.getCurrentPageSize() != null) { + toIndex = search.getCurrentPageSize(); + } + + // we already have data from the initial step so process as much as we can. + ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); + allCurrentResources = search.getResources(0, toIndex); + + } else { + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); + } + + RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); + String queryPart = searchUrl.substring(searchUrl.indexOf('?')); + SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); + int offset = theJobDetails.getCurrentOffset() + fromIndex; + params.setOffset(offset); + params.setCount(toIndex); + + ourLog.info( + "Triggered job[{}] requesting {} resources from offset {}", + theJobDetails.getJobId(), + toIndex, + offset); + + search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); + allCurrentResources = search.getResources(0, submittableCount); + } + + ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); + AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); + + for (IBaseResource nextResource : allCurrentResources) { + Future future = + myExecutorService.submit(() -> submitResource(theJobDetails.getSubscriptionId(), nextResource)); + futures.add(future); + totalSubmitted.incrementAndGet(); + highestIndexSubmitted.incrementAndGet(); + } + + if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { + + theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); + + ourLog.info( + "Triggered job[{}] lastUploadedIndex is {}", + theJobDetails.getJobId(), + theJobDetails.getCurrentSearchLastUploadedIndex()); + + if (allCurrentResources.isEmpty() + || nonNull(theJobDetails.getCurrentSearchCount()) + && toIndex > theJobDetails.getCurrentSearchCount()) { + ourLog.info( + "Triggered job[{}] for search URL {} has completed ", + theJobDetails.getJobId(), + theJobDetails.getCurrentSearchUrl()); + theJobDetails.setCurrentSearchResourceType(null); + theJobDetails.clearCurrentSearchUrl(); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); + theJobDetails.setCurrentSearchCount(null); + } + } } private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) { @@ -462,34 +486,31 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc return isInitialStep(theJobDetails); } - private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List>> theIdToFutures) { + private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List> theFutures) { - for (Pair> next : theIdToFutures) { - String nextDeliveredId = next.getKey(); + for (Future nextFuture : theFutures) { try { - Future nextFuture = next.getValue(); nextFuture.get(); - ourLog.info("Finished redelivering {}", nextDeliveredId); } catch (Exception e) { - ourLog.error("Failure triggering resource " + nextDeliveredId, e); + ourLog.error("Failure triggering resource", e); return true; } } // Clear the list since it will potentially get reused - theIdToFutures.clear(); + theFutures.clear(); return false; } - private Future submitResource(String theSubscriptionId, String theResourceIdToTrigger) { + private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); - return submitResource(theSubscriptionId, resourceToTrigger); + submitResource(theSubscriptionId, resourceToTrigger); } - private Future submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { + private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { ourLog.info( "Submitting resource {} to subscription {}", @@ -500,24 +521,23 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); msg.setSubscriptionId(theSubscriptionId); - return myExecutorService.submit(() -> { - for (int i = 0; ; i++) { - try { - myResourceModifiedConsumer.submitResourceModified(msg); - break; - } catch (Exception e) { - if (i >= 3) { - throw new InternalErrorException(Msg.code(25) + e); - } + for (int i = 0; ; i++) { + try { + myResourceModifiedConsumer.submitResourceModified(msg); + break; + } catch (Exception e) { + if (i >= 3) { + throw new InternalErrorException(Msg.code(25) + e); + } - ourLog.warn( - "Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); + ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); + try { Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } } - - return null; - }); + } } public void cancelAll() { diff --git a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java index 79c2f798c1c..16961762a24 100644 --- a/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-test-dstu3/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionTriggeringDstu3Test.java @@ -44,6 +44,8 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.springframework.beans.factory.annotation.Autowired; import javax.servlet.http.HttpServletRequest; @@ -499,14 +501,18 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te myInterceptorService.unregisterInterceptor(forceSynchronousSearchInterceptor); } - @Test - public void testTriggerSubscriptionInSynchronousQueryMode() throws Exception { - ((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(10); + @ParameterizedTest + @CsvSource({ + "10", + "10000" + }) + public void testTriggerSubscriptionInSynchronousQueryMode(int theMaxSubmitPerpass) throws Exception { + ((SubscriptionTriggeringSvcImpl)mySubscriptionTriggeringSvc).setMaxSubmitPerPass(theMaxSubmitPerpass); String payload = "application/fhir+json"; IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); - int numberOfPatient = 15; + int numberOfPatient = 200; // Create lots createPatientsAndWait(numberOfPatient); @@ -522,9 +528,9 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te .withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?")) .execute(); - mySubscriptionTriggeringSvc.runDeliveryPass(); - mySubscriptionTriggeringSvc.runDeliveryPass(); - mySubscriptionTriggeringSvc.runDeliveryPass(); + for (int i = 0; i < 20; i++) { + mySubscriptionTriggeringSvc.runDeliveryPass(); + } waitForSize(0, ourCreatedPatients); waitForSize(numberOfPatient, ourUpdatedPatients); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java index 826a7b382c3..e1a535b678b 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4QueryCountTest.java @@ -19,6 +19,7 @@ import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum; import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao; import ca.uhn.fhir.jpa.entity.TermValueSet; import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; +import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor; import ca.uhn.fhir.jpa.model.entity.ForcedId; import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.util.JpaConstants; @@ -27,7 +28,9 @@ import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc; import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc; +import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl; import ca.uhn.fhir.jpa.term.TermReadSvcImpl; +import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil; import ca.uhn.fhir.jpa.util.SqlQuery; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.RestOperationTypeEnum; @@ -40,9 +43,12 @@ import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.rest.server.interceptor.auth.AuthorizationInterceptor; import ca.uhn.fhir.rest.server.interceptor.auth.PolicyEnum; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import ca.uhn.fhir.test.utilities.ProxyUtil; import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension; import ca.uhn.fhir.test.utilities.server.RestfulServerExtension; import ca.uhn.fhir.util.BundleBuilder; +import org.hamcrest.CoreMatchers; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.BooleanType; @@ -94,12 +100,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static ca.uhn.fhir.jpa.subscription.FhirR4Util.createSubscription; import static org.apache.commons.lang3.StringUtils.countMatches; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -145,6 +155,9 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test private ReindexStep myReindexStep; @Autowired private DeleteExpungeStep myDeleteExpungeStep; + @Autowired + protected SubscriptionTestUtil mySubscriptionTestUtil; + @AfterEach public void afterResetDao() { @@ -167,6 +180,8 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test myFhirContext.getParserOptions().setStripVersionsFromReferences(true); TermReadSvcImpl.setForceDisableHibernateSearchForUnitTest(false); + + mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); } @Override @@ -3073,8 +3088,6 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test Bundle input = bb.getBundleTyped(); -// input.getEntry().get(0). - myCaptureQueriesListener.clear(); mySystemDao.transaction(mySrd, input); assertEquals(1, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); @@ -3087,37 +3100,102 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test */ @SuppressWarnings("unchecked") @Test - public void testTriggerSubscription() throws Exception { + public void testTriggerSubscription_Sync() throws Exception { // Setup + IntStream.range(0, 200).forEach(i->createAPatient()); - myStorageSettings.addSupportedSubscriptionType(org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK); - myResourceModifiedSubmitterSvc.startIfNeeded(); + mySubscriptionTestUtil.registerRestHookInterceptor(); + ForceOffsetSearchModeInterceptor interceptor = new ForceOffsetSearchModeInterceptor(); + myInterceptorRegistry.registerInterceptor(interceptor); + try { + String payload = "application/fhir+json"; + Subscription subscription = createSubscription("Patient?", payload, ourServer.getBaseUrl(), null); + IIdType subscriptionId = mySubscriptionDao.create(subscription, mySrd).getId(); - for (int i = 0; i < 10; i++) { - createPatient(withActiveTrue()); + waitForActivatedSubscriptionCount(1); + + mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?")), subscriptionId); + + // Test + myCaptureQueriesListener.clear(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + myCaptureQueriesListener.logSelectQueries(); + ourPatientProvider.waitForUpdateCount(200); + + // Validate + assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); + assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); + assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); + } finally { + myInterceptorRegistry.unregisterInterceptor(interceptor); } + } + + + @Test + public void testTriggerSubscription_Async() throws Exception { + // Setup + IntStream.range(0, 200).forEach(i->createAPatient()); + + mySubscriptionTestUtil.registerRestHookInterceptor(); + + String payload = "application/fhir+json"; + Subscription subscription = createSubscription("Patient?", payload, ourServer.getBaseUrl(), null); + IIdType subId = mySubscriptionDao.create(subscription, mySrd).getId(); - Subscription subscription = new Subscription(); - subscription.getChannel().setEndpoint(ourServer.getBaseUrl()); - subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK); - subscription.getChannel().setPayload(Constants.CT_FHIR_JSON_NEW); - subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); - subscription.setCriteria("Patient?active=true"); - IIdType subscriptionId = mySubscriptionDao.create(subscription, mySrd).getId().toUnqualifiedVersionless(); waitForActivatedSubscriptionCount(1); - mySubscriptionTriggeringSvc.triggerSubscription(null, List.of(new StringType("Patient?active=true")), subscriptionId); - // Test myCaptureQueriesListener.clear(); - mySubscriptionTriggeringSvc.runDeliveryPass(); - ourPatientProvider.waitForUpdateCount(10); + Parameters response = myClient + .operation() + .onInstance(subId) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_SEARCH_URL, new StringType("Patient?")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, CoreMatchers.containsString("Subscription triggering job submitted as JOB ID")); - // Validate - assertEquals(6, myCaptureQueriesListener.countSelectQueriesForCurrentThread()); - assertEquals(1, myCaptureQueriesListener.countUpdateQueriesForCurrentThread()); - assertEquals(11, myCaptureQueriesListener.countInsertQueriesForCurrentThread()); - assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread()); + assertEquals(3, myCaptureQueriesListener.countSelectQueries()); + assertEquals(0, myCaptureQueriesListener.countInsertQueries()); + assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); + + myCaptureQueriesListener.clear(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + + myCaptureQueriesListener.logInsertQueries(); + assertEquals(15, myCaptureQueriesListener.countSelectQueries()); + assertEquals(201, myCaptureQueriesListener.countInsertQueries()); + assertEquals(3, myCaptureQueriesListener.countUpdateQueries()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); + + myCaptureQueriesListener.clear(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + + assertEquals(2, myCaptureQueriesListener.countSelectQueries()); + assertEquals(0, myCaptureQueriesListener.countInsertQueries()); + assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); + + myCaptureQueriesListener.clear(); + mySubscriptionTriggeringSvc.runDeliveryPass(); + + assertEquals(0, myCaptureQueriesListener.countSelectQueries()); + assertEquals(0, myCaptureQueriesListener.countInsertQueries()); + assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); + assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); + + SubscriptionTriggeringSvcImpl svc = ProxyUtil.getSingletonTarget(mySubscriptionTriggeringSvc, SubscriptionTriggeringSvcImpl.class); + assertEquals(0, svc.getActiveJobCount()); + + assertEquals(0, ourPatientProvider.getCountCreate()); + await().until(ourPatientProvider::getCountUpdate, equalTo(200L)); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/FhirR4Util.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/FhirR4Util.java index fa8ea305503..31f92581c61 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/FhirR4Util.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/FhirR4Util.java @@ -12,12 +12,14 @@ import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Subscription; +import javax.annotation.Nullable; + public class FhirR4Util { public static final String LPI_CODESYSTEM = "http://cognitivemedicine.com/lpi"; public static final String LPI_CODE = "LPI-FHIR"; - public static Subscription createSubscription(String criteria, String payload, String endpoint, IGenericClient client) { + public static Subscription createSubscription(String criteria, String payload, String endpoint, @Nullable IGenericClient client) { Subscription subscription = new Subscription(); subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); @@ -29,8 +31,10 @@ public class FhirR4Util { channel.setEndpoint(endpoint); subscription.setChannel(channel); - MethodOutcome methodOutcome = client.create().resource(subscription).execute(); - subscription.setId(methodOutcome.getId().getIdPart()); + if (client != null) { + MethodOutcome methodOutcome = client.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + } return subscription; } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java index 540f5ae1f7d..2fc3ca8bb20 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/message/MessageSubscriptionR4Test.java @@ -79,7 +79,8 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test { myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode()); } - @BeforeEach + @Override + @BeforeEach public void beforeRegisterRestHookListener() { mySubscriptionTestUtil.registerMessageInterceptor(); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java index ace7684317f..29163d8018c 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR4Test.java @@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; +import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil; import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatcher; import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry; import ca.uhn.fhir.model.primitive.IdDt; diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/util/CircularQueueCaptureQueriesListener.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/util/CircularQueueCaptureQueriesListener.java index 3251fc242f0..ed355cdc3fa 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/util/CircularQueueCaptureQueriesListener.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/util/CircularQueueCaptureQueriesListener.java @@ -208,40 +208,31 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured UPDATE queries */ public String logUpdateQueriesForCurrentThread() { - List queries = getUpdateQueriesForCurrentThread().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - String joined = String.join("\n", queries); + List queries = getUpdateQueriesForCurrentThread(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + String joined = String.join("\n", queriesStrings); ourLog.info("Update Queries:\n{}", joined); return joined; } /** * Log all captured SELECT queries - * - * @return */ public String logSelectQueriesForCurrentThread(int... theIndexes) { - List queries = getSelectQueriesForCurrentThread().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); + List queries = getSelectQueriesForCurrentThread(); + List queriesStrings = renderQueriesForLogging(true, true, queries); List newList = new ArrayList<>(); if (theIndexes != null && theIndexes.length > 0) { - for (int i = 0; i < theIndexes.length; i++) { - int index = theIndexes[i]; - newList.add("[" + index + "] " + queries.get(index)); - } - } else { - for (int i = 0; i < queries.size(); i++) { - newList.add("[" + i + "] " + queries.get(i)); + for (int index : theIndexes) { + newList.add(queriesStrings.get(index)); } + queriesStrings = newList; } - queries = newList; - String queriesAsString = String.join("\n", queries); - ourLog.info("Select Queries:\n{}", queriesAsString); - return queriesAsString; + String joined = String.join("\n", queriesStrings); + ourLog.info("Select Queries:\n{}", joined); + return joined; } /** @@ -256,20 +247,32 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe */ public List logSelectQueries(boolean theInlineParams, boolean theFormatSql) { List queries = getSelectQueries(); - List queriesStrings = queries.stream() - .map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, theInlineParams, theFormatSql)) - .collect(Collectors.toList()); + List queriesStrings = renderQueriesForLogging(theInlineParams, theFormatSql, queries); ourLog.info("Select Queries:\n{}", String.join("\n", queriesStrings)); return queries; } + @Nonnull + private static List renderQueriesForLogging( + boolean theInlineParams, boolean theFormatSql, List queries) { + List queriesStrings = new ArrayList<>(); + for (int i = 0; i < queries.size(); i++) { + SqlQuery query = queries.get(i); + String remderedString = "[" + i + "] " + + CircularQueueCaptureQueriesListener.formatQueryAsSql(query, theInlineParams, theFormatSql); + queriesStrings.add(remderedString); + } + return queriesStrings; + } + /** * Log first captured SELECT query */ public void logFirstSelectQueryForCurrentThread() { + boolean inlineParams = true; String firstSelectQuery = getSelectQueriesForCurrentThread().stream() .findFirst() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) + .map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, inlineParams, inlineParams)) .orElse("NONE FOUND"); ourLog.info("First select SqlQuery:\n{}", firstSelectQuery); } @@ -278,10 +281,9 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured INSERT queries */ public String logInsertQueriesForCurrentThread() { - List queries = getInsertQueriesForCurrentThread().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - String queriesAsString = String.join("\n", queries); + List queries = getInsertQueriesForCurrentThread(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + String queriesAsString = String.join("\n", queriesStrings); ourLog.info("Insert Queries:\n{}", queriesAsString); return queriesAsString; } @@ -290,20 +292,18 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured queries */ public void logAllQueriesForCurrentThread() { - List queries = getAllQueriesForCurrentThread().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - ourLog.info("Queries:\n{}", String.join("\n", queries)); + List queries = getAllQueriesForCurrentThread(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + ourLog.info("Queries:\n{}", String.join("\n", queriesStrings)); } /** * Log all captured queries */ public void logAllQueries() { - List queries = getCapturedQueries().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - ourLog.info("Queries:\n{}", String.join("\n", queries)); + List queries = getCapturedQueries(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + ourLog.info("Queries:\n{}", String.join("\n", queriesStrings)); } /** @@ -317,10 +317,12 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured INSERT queries */ public int logInsertQueries(Predicate theInclusionPredicate) { - List insertQueries = getInsertQueries(); - List queries = insertQueries.stream() + List insertQueries = getInsertQueries().stream() .filter(t -> theInclusionPredicate == null || theInclusionPredicate.test(t)) - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) + .collect(Collectors.toList()); + boolean inlineParams = true; + List queries = insertQueries.stream() + .map(t -> CircularQueueCaptureQueriesListener.formatQueryAsSql(t, inlineParams, inlineParams)) .collect(Collectors.toList()); ourLog.info("Insert Queries:\n{}", String.join("\n", queries)); @@ -331,23 +333,20 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured INSERT queries */ public int logUpdateQueries() { - List updateQueries = getUpdateQueries(); - List queries = updateQueries.stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - ourLog.info("Update Queries:\n{}", String.join("\n", queries)); + List queries = getUpdateQueries(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + ourLog.info("Update Queries:\n{}", String.join("\n", queriesStrings)); - return countQueries(updateQueries); + return countQueries(queries); } /** * Log all captured DELETE queries */ public String logDeleteQueriesForCurrentThread() { - List queries = getDeleteQueriesForCurrentThread().stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - String joined = String.join("\n", queries); + List queries = getDeleteQueriesForCurrentThread(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + String joined = String.join("\n", queriesStrings); ourLog.info("Delete Queries:\n{}", joined); return joined; } @@ -356,13 +355,11 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe * Log all captured DELETE queries */ public int logDeleteQueries() { - List deleteQueries = getDeleteQueries(); - List queries = deleteQueries.stream() - .map(CircularQueueCaptureQueriesListener::formatQueryAsSql) - .collect(Collectors.toList()); - ourLog.info("Delete Queries:\n{}", String.join("\n", queries)); + List queries = getDeleteQueries(); + List queriesStrings = renderQueriesForLogging(true, true, queries); + ourLog.info("Delete Queries:\n{}", String.join("\n", queriesStrings)); - return countQueries(deleteQueries); + return countQueries(queries); } public int countSelectQueries() { diff --git a/hapi-tinder-plugin/src/main/resources/vm/jpa_spring_beans_java.vm b/hapi-tinder-plugin/src/main/resources/vm/jpa_spring_beans_java.vm index 1cb6f5e6102..a3e2d44a42d 100644 --- a/hapi-tinder-plugin/src/main/resources/vm/jpa_spring_beans_java.vm +++ b/hapi-tinder-plugin/src/main/resources/vm/jpa_spring_beans_java.vm @@ -5,7 +5,6 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import javax.persistence.EntityManager; import org.springframework.transaction.PlatformTransactionManager; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;