Test fixes
This commit is contained in:
parent
6511545d25
commit
77e35f5a56
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -76,6 +76,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
||||||
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
static final String SUBSCRIPTION_STATUS = "Subscription.status";
|
||||||
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||||
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
|
private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
|
||||||
|
private final Object myInitSubscriptionsLock = new Object();
|
||||||
private SubscribableChannel myProcessingChannel;
|
private SubscribableChannel myProcessingChannel;
|
||||||
private Map<String, SubscribableChannel> myDeliveryChannel;
|
private Map<String, SubscribableChannel> myDeliveryChannel;
|
||||||
private ExecutorService myProcessingExecutor;
|
private ExecutorService myProcessingExecutor;
|
||||||
|
@ -362,39 +363,41 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer doInitSubscriptions() {
|
public Integer doInitSubscriptions() {
|
||||||
ourLog.debug("Starting init subscriptions");
|
synchronized (myInitSubscriptionsLock) {
|
||||||
SearchParameterMap map = new SearchParameterMap();
|
ourLog.debug("Starting init subscriptions");
|
||||||
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
|
SearchParameterMap map = new SearchParameterMap();
|
||||||
map.add(Subscription.SP_STATUS, new TokenOrListParam()
|
map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
|
||||||
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
|
map.add(Subscription.SP_STATUS, new TokenOrListParam()
|
||||||
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
|
||||||
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
|
||||||
|
map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
|
||||||
|
|
||||||
RequestDetails req = new ServletSubRequestDetails();
|
RequestDetails req = new ServletSubRequestDetails();
|
||||||
req.setSubRequest(true);
|
req.setSubRequest(true);
|
||||||
|
|
||||||
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
|
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
|
||||||
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
|
if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
|
||||||
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
|
ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded.");
|
||||||
}
|
|
||||||
|
|
||||||
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
|
|
||||||
|
|
||||||
Set<String> allIds = new HashSet<>();
|
|
||||||
int changesCount = 0;
|
|
||||||
for (IBaseResource resource : resourceList) {
|
|
||||||
String nextId = resource.getIdElement().getIdPart();
|
|
||||||
allIds.add(nextId);
|
|
||||||
boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource);
|
|
||||||
if (changed) {
|
|
||||||
changesCount++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
|
||||||
|
|
||||||
|
Set<String> allIds = new HashSet<>();
|
||||||
|
int changesCount = 0;
|
||||||
|
for (IBaseResource resource : resourceList) {
|
||||||
|
String nextId = resource.getIdElement().getIdPart();
|
||||||
|
allIds.add(nextId);
|
||||||
|
boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource);
|
||||||
|
if (changed) {
|
||||||
|
changesCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unregisterAllSubscriptionsNotInCollection(allIds);
|
||||||
|
ourLog.trace("Finished init subscriptions - found {}", resourceList.size());
|
||||||
|
|
||||||
|
return changesCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
unregisterAllSubscriptionsNotInCollection(allIds);
|
|
||||||
ourLog.trace("Finished init subscriptions - found {}", resourceList.size());
|
|
||||||
|
|
||||||
return changesCount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
|
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
|
|
@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription;
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
|
|
@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||||
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
|
||||||
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
|
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
|
||||||
import ca.uhn.fhir.jpa.entity.Search;
|
import ca.uhn.fhir.jpa.entity.Search;
|
||||||
|
import ca.uhn.fhir.jpa.entity.SearchStatusEnum;
|
||||||
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
|
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
|
||||||
import ca.uhn.fhir.util.StopWatch;
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||||
|
@ -32,6 +33,7 @@ import static ca.uhn.fhir.jpa.util.TestUtil.sleepAtLeast;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@SuppressWarnings("Duplicates")
|
||||||
public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
|
public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchPageExpiryTest.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(FhirResourceDaoR4SearchPageExpiryTest.class);
|
||||||
|
|
||||||
|
@ -404,7 +406,7 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
|
||||||
Search search = null;
|
Search search = null;
|
||||||
for (int i = 0; i < 20 && search == null; i++) {
|
for (int i = 0; i < 20 && search == null; i++) {
|
||||||
search = theSearchEntityDao.findByUuid(theUuid);
|
search = theSearchEntityDao.findByUuid(theUuid);
|
||||||
if (search == null) {
|
if (search == null || search.getStatus() == SearchStatusEnum.LOADING) {
|
||||||
sleepAtLeast(100);
|
sleepAtLeast(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import ca.uhn.fhir.rest.client.api.IGenericClient;
|
||||||
import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
|
import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
|
||||||
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
|
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
|
||||||
import ca.uhn.fhir.rest.server.RestfulServer;
|
import ca.uhn.fhir.rest.server.RestfulServer;
|
||||||
|
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
|
||||||
import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor;
|
import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor;
|
||||||
import ca.uhn.fhir.util.PortUtil;
|
import ca.uhn.fhir.util.PortUtil;
|
||||||
import ca.uhn.fhir.util.TestUtil;
|
import ca.uhn.fhir.util.TestUtil;
|
||||||
|
@ -46,6 +47,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
|
|
||||||
|
@ -205,6 +207,18 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void waitForRegisteredSubscriptionCount(int theSize) throws Exception {
|
protected void waitForRegisteredSubscriptionCount(int theSize) throws Exception {
|
||||||
|
for (int i = 0; i++;) {
|
||||||
|
if (i == 10) {
|
||||||
|
fail("Failed to init subscriptions");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
getRestHookSubscriptionInterceptor().doInitSubscriptions();
|
||||||
|
break;
|
||||||
|
} catch (ResourceVersionConflictException e) {
|
||||||
|
Thread.sleep(250);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SubscriptionRestHookInterceptor interceptor = getRestHookSubscriptionInterceptor();
|
SubscriptionRestHookInterceptor interceptor = getRestHookSubscriptionInterceptor();
|
||||||
TestUtil.waitForSize(theSize, () -> interceptor.getRegisteredSubscriptions().size());
|
TestUtil.waitForSize(theSize, () -> interceptor.getRegisteredSubscriptions().size());
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
|
|
Loading…
Reference in New Issue