diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 0e04cd4d082..dd9f9a0ac65 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -76,6 +76,7 @@ public abstract class BaseSubscriptionInterceptor exten static final String SUBSCRIPTION_STATUS = "Subscription.status"; static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000; + private final Object myInitSubscriptionsLock = new Object(); private SubscribableChannel myProcessingChannel; private Map myDeliveryChannel; private ExecutorService myProcessingExecutor; @@ -362,39 +363,41 @@ public abstract class BaseSubscriptionInterceptor exten } public Integer doInitSubscriptions() { - ourLog.debug("Starting init subscriptions"); - SearchParameterMap map = new SearchParameterMap(); - map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode())); - map.add(Subscription.SP_STATUS, new TokenOrListParam() - .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) - .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); - map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); + synchronized (myInitSubscriptionsLock) { + ourLog.debug("Starting init subscriptions"); + SearchParameterMap map = new SearchParameterMap(); + map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode())); + map.add(Subscription.SP_STATUS, new TokenOrListParam() + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) + .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); + map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS); - RequestDetails req = new ServletSubRequestDetails(); - req.setSubRequest(true); + RequestDetails req = new ServletSubRequestDetails(); + req.setSubRequest(true); - IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req); - if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) { - ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); - } - - List resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size()); - - Set allIds = new HashSet<>(); - int changesCount = 0; - for (IBaseResource resource : resourceList) { - String nextId = resource.getIdElement().getIdPart(); - allIds.add(nextId); - boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource); - if (changed) { - changesCount++; + IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req); + if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) { + ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); } + + List resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size()); + + Set allIds = new HashSet<>(); + int changesCount = 0; + for (IBaseResource resource : resourceList) { + String nextId = resource.getIdElement().getIdPart(); + allIds.add(nextId); + boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource); + if (changed) { + changesCount++; + } + } + + unregisterAllSubscriptionsNotInCollection(allIds); + ourLog.trace("Finished init subscriptions - found {}", resourceList.size()); + + return changesCount; } - - unregisterAllSubscriptionsNotInCollection(allIds); - ourLog.trace("Finished init subscriptions - found {}", resourceList.size()); - - return changesCount; } @SuppressWarnings("unused") diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java index 8115ff8309e..75b4126b4b7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/CanonicalSubscription.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index fba4f162aef..7e78222f59e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchPageExpiryTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchPageExpiryTest.java index 9b0ae526bd2..9b22d036883 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchPageExpiryTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4SearchPageExpiryTest.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.SearchParameterMap; import ca.uhn.fhir.jpa.dao.data.ISearchDao; import ca.uhn.fhir.jpa.entity.Search; +import ca.uhn.fhir.jpa.entity.SearchStatusEnum; import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl; import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.rest.api.server.IBundleProvider; @@ -32,6 +33,7 @@ import static ca.uhn.fhir.jpa.util.TestUtil.sleepAtLeast; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.*; +@SuppressWarnings("Duplicates") public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test { private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchPageExpiryTest.class); @@ -404,7 +406,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test { Search search = null; for (int i = 0; i < 20 && search == null; i++) { search = theSearchEntityDao.findByUuid(theUuid); - if (search == null) { + if (search == null || search.getStatus() == SearchStatusEnum.LOADING) { sleepAtLeast(100); } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java index 3f8149b8c1a..b1f0d7a8cb2 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/r4/BaseResourceProviderR4Test.java @@ -15,6 +15,7 @@ import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor; import ca.uhn.fhir.util.PortUtil; import ca.uhn.fhir.util.TestUtil; @@ -46,6 +47,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.junit.Assert.fail; public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { @@ -205,6 +207,18 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test { } protected void waitForRegisteredSubscriptionCount(int theSize) throws Exception { + for (int i = 0; i++;) { + if (i == 10) { + fail("Failed to init subscriptions"); + } + try { + getRestHookSubscriptionInterceptor().doInitSubscriptions(); + break; + } catch (ResourceVersionConflictException e) { + Thread.sleep(250); + } + } + SubscriptionRestHookInterceptor interceptor = getRestHookSubscriptionInterceptor(); TestUtil.waitForSize(theSize, () -> interceptor.getRegisteredSubscriptions().size()); Thread.sleep(500);