Subscription cleanup

This commit is contained in:
James Agnew 2018-08-10 09:07:55 -04:00
parent ec4604c498
commit ece0c9defe
9 changed files with 46 additions and 145 deletions

View File

@ -108,7 +108,7 @@ public abstract class BaseCommand implements Comparable<BaseCommand> {
if (theOptions.getOption(theOpt) != null) {
throw new IllegalStateException("Duplicate option: " + theOpt);
}
if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t-> theOpt.equals(t.getOpt()))) {
if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t -> theOpt.equals(t.getOpt()))) {
throw new IllegalStateException("Duplicate option: " + theOpt);
}
}
@ -116,7 +116,7 @@ public abstract class BaseCommand implements Comparable<BaseCommand> {
if (theOptions.getOption(theLongOpt) != null) {
throw new IllegalStateException("Duplicate option: " + theLongOpt);
}
if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t-> theLongOpt.equals(t.getLongOpt()))) {
if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t -> theLongOpt.equals(t.getLongOpt()))) {
throw new IllegalStateException("Duplicate option: " + theOpt);
}
}
@ -362,8 +362,12 @@ public abstract class BaseCommand implements Comparable<BaseCommand> {
throw new ParseException("Invalid target server specified, must begin with 'http' or 'file'.");
}
return newClientWithBaseUrl(theCommandLine, baseUrl, theBasicAuthOptionName, theBearerTokenOptionName);
}
protected IGenericClient newClientWithBaseUrl(CommandLine theCommandLine, String theBaseUrl, String theBasicAuthOptionName, String theBearerTokenOptionName) {
myFhirCtx.getRestfulClientFactory().setSocketTimeout(10 * 60 * 1000);
IGenericClient retVal = myFhirCtx.newRestfulGenericClient(baseUrl);
IGenericClient retVal = myFhirCtx.newRestfulGenericClient(theBaseUrl);
String basicAuthHeaderValue = getAndParseOptionBasicAuthHeader(theCommandLine, theBasicAuthOptionName);
if (isNotBlank(basicAuthHeaderValue)) {

View File

@ -706,9 +706,9 @@ public class DaoConfig {
* references instead of being treated as real references.
* <p>
* A logical reference is a reference which is treated as an identifier, and
* does not neccesarily resolve. See {@link "http://hl7.org/fhir/references.html"} for
* does not neccesarily resolve. See <a href="http://hl7.org/fhir/references.html">references</a> for
* a description of logical references. For example, the valueset
* {@link "http://hl7.org/fhir/valueset-quantity-comparator.html"} is a logical
* <a href="http://hl7.org/fhir/valueset-quantity-comparator.html">valueset-quantity-comparator</a> is a logical
* reference.
* </p>
* <p>
@ -731,9 +731,9 @@ public class DaoConfig {
* references instead of being treated as real references.
* <p>
* A logical reference is a reference which is treated as an identifier, and
* does not neccesarily resolve. See {@link "http://hl7.org/fhir/references.html"} for
* does not neccesarily resolve. See <a href="http://hl7.org/fhir/references.html">references</a> for
* a description of logical references. For example, the valueset
* {@link "http://hl7.org/fhir/valueset-quantity-comparator.html"} is a logical
* <a href="http://hl7.org/fhir/valueset-quantity-comparator.html">valueset-quantity-comparator</a> is a logical
* reference.
* </p>
* <p>

View File

@ -50,6 +50,7 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.AsyncTaskExecutor;
@ -82,7 +83,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
private MessageHandler mySubscriptionCheckingSubscriber;
private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SubscribableChannel> myIdToSubscribaleChannel = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, SubscribableChannel> mySubscribableChannel = new ConcurrentHashMap<>();
private Multimap<String, MessageHandler> myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
private ThreadPoolExecutor myDeliveryExecutor;
@ -301,7 +302,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
}
protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) {
return myIdToSubscribaleChannel.get(theSubscription.getIdElement(myCtx).getIdPart());
return mySubscribableChannel.get(theSubscription.getIdElement(myCtx).getIdPart());
}
public int getExecutorQueueSizeForUnitTests() {
@ -384,7 +385,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
}
public void registerHandler(String theSubscriptionId, MessageHandler theHandler) {
myIdToSubscribaleChannel.get(theSubscriptionId).subscribe(theHandler);
mySubscribableChannel.get(theSubscriptionId).subscribe(theHandler);
myIdToDeliveryHandler.put(theSubscriptionId, theHandler);
}
@ -399,7 +400,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized);
Optional<MessageHandler> deliveryHandler = createDeliveryHandler(canonicalized);
myIdToSubscribaleChannel.put(subscriptionId, deliveryChannel);
mySubscribableChannel.put(subscriptionId, deliveryChannel);
myIdToSubscription.put(subscriptionId, canonicalized);
deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler));
@ -545,13 +546,20 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
}
}
public void unregisterHandler(String theSubscriptionId, MessageHandler next) {
SubscribableChannel channel = myIdToSubscribaleChannel.get(theSubscriptionId);
public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) {
SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId);
if (channel != null) {
channel.unsubscribe(next);
if (channel instanceof DisposableBean) {
try {
((DisposableBean) channel).destroy();
} catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e);
}
}
channel.unsubscribe(theMessageHandler);
}
myIdToSubscribaleChannel.remove(theSubscriptionId, next);
mySubscribableChannel.remove(theSubscriptionId, theMessageHandler);
}
@SuppressWarnings("UnusedReturnValue")
@ -565,7 +573,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
unregisterHandler(subscriptionId, next);
}
myIdToSubscribaleChannel.remove(subscriptionId);
mySubscribableChannel.remove(subscriptionId);
return myIdToSubscription.remove(subscriptionId);
}

View File

@ -126,15 +126,12 @@ public class SubscriptionActivatingSubscriber {
activateSubscription(activeStatus, theSubscription, requestedStatus);
}
} else if (activeStatus.equals(statusString)) {
if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
}
mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else {
if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue());
mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
}
mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
}
}
@ -145,7 +142,7 @@ public class SubscriptionActivatingSubscriber {
try {
SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
mySubscriptionDao.update(subscription);
mySubscriptionInterceptor.registerSubscription(subscription.getIdElement(), subscription);
registerSubscriptionUnlessAlreadyRegistered(subscription);
} catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
SubscriptionUtil.setStatus(myCtx, subscription, "error");
@ -179,6 +176,13 @@ public class SubscriptionActivatingSubscriber {
}
private void registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
}
}
@VisibleForTesting
public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;

View File

@ -1,115 +0,0 @@
package ca.uhn.fhir.jpa.util;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import javax.xml.ws.http.HTTPException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Rest service utilities. Generally used in the tests
*/
public class RestUtilities {
public static final String CONTEXT_PATH = "";
public static final String APPLICATION_JSON = "application/json";
/**
* Get the response for a CXF REST service without an object parameter
*
* @param url
* @param typeRequest
* @return
* @throws IOException
*/
public static String getResponse(String url, MethodRequest typeRequest) throws IOException {
return getResponse(url, (StringEntity) null, typeRequest);
}
/**
* Get the response for a CXF REST service with an object parameter
*
* @param url
* @param parameterEntity
* @param typeRequest
* @return
* @throws IOException
*/
public static String getResponse(String url, StringEntity parameterEntity, MethodRequest typeRequest) throws IOException {
HttpClient httpclient = new DefaultHttpClient();
HttpResponse response;
switch (typeRequest) {
case POST:
HttpPost httppost = new HttpPost(url);
httppost.setHeader("Content-type", APPLICATION_JSON);
if (parameterEntity != null) {
httppost.setEntity(parameterEntity);
}
response = httpclient.execute(httppost);
break;
case PUT:
HttpPut httpPut = new HttpPut(url);
httpPut.setHeader("Content-type", APPLICATION_JSON);
if (parameterEntity != null) {
httpPut.setEntity(parameterEntity);
}
response = httpclient.execute(httpPut);
break;
case DELETE:
HttpDelete httpDelete = new HttpDelete(url);
httpDelete.setHeader("Content-type", APPLICATION_JSON);
response = httpclient.execute(httpDelete);
break;
case GET:
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Content-type", APPLICATION_JSON);
response = httpclient.execute(httpGet);
break;
default:
throw new IllegalArgumentException("Cannot handle type request " + typeRequest);
}
if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() >= 300) {
throw new HTTPException(response.getStatusLine().getStatusCode());
}
if (response.getStatusLine().getStatusCode() == 204) {
return "";
}
//Closes connections that have already been closed by the server
//org.apache.http.NoHttpResponseException: The target server failed to respond
httpclient.getConnectionManager().closeIdleConnections(1, TimeUnit.SECONDS);
return EntityUtils.toString(response.getEntity());
}
}