From ece0c9defea24ae646ebf19ca9bf8ad76dcb5cfb Mon Sep 17 00:00:00 2001 From: James Agnew Date: Fri, 10 Aug 2018 09:07:55 -0400 Subject: [PATCH] Subscription cleanup --- .../java/ca/uhn/fhir/cli/BaseCommand.java | 10 +- .../java/ca/uhn/fhir/jpa/dao/DaoConfig.java | 8 +- .../ResourceHistoryTag.java_70782329243090 | 0 .../BaseSubscriptionInterceptor.java | 30 +++-- .../SubscriptionActivatingSubscriber.java | 16 ++- .../email/SubscriptionEmailInterceptor.java | 4 +- .../SubscriptionRestHookInterceptor.java | 4 +- .../SubscriptionWebsocketInterceptor.java | 4 +- .../ca/uhn/fhir/jpa/util/RestUtilities.java | 115 ------------------ 9 files changed, 46 insertions(+), 145 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/ResourceHistoryTag.java_70782329243090 delete mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/RestUtilities.java diff --git a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseCommand.java b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseCommand.java index 736f374335b..53c80064dc6 100644 --- a/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseCommand.java +++ b/hapi-fhir-cli/hapi-fhir-cli-api/src/main/java/ca/uhn/fhir/cli/BaseCommand.java @@ -108,7 +108,7 @@ public abstract class BaseCommand implements Comparable { if (theOptions.getOption(theOpt) != null) { throw new IllegalStateException("Duplicate option: " + theOpt); } - if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t-> theOpt.equals(t.getOpt()))) { + if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t -> theOpt.equals(t.getOpt()))) { throw new IllegalStateException("Duplicate option: " + theOpt); } } @@ -116,7 +116,7 @@ public abstract class BaseCommand implements Comparable { if (theOptions.getOption(theLongOpt) != null) { throw new IllegalStateException("Duplicate option: " + theLongOpt); } - if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t-> theLongOpt.equals(t.getLongOpt()))) { + if (theOptionGroup != null && theOptionGroup.getOptions().stream().anyMatch(t -> theLongOpt.equals(t.getLongOpt()))) { throw new IllegalStateException("Duplicate option: " + theOpt); } } @@ -362,8 +362,12 @@ public abstract class BaseCommand implements Comparable { throw new ParseException("Invalid target server specified, must begin with 'http' or 'file'."); } + return newClientWithBaseUrl(theCommandLine, baseUrl, theBasicAuthOptionName, theBearerTokenOptionName); + } + + protected IGenericClient newClientWithBaseUrl(CommandLine theCommandLine, String theBaseUrl, String theBasicAuthOptionName, String theBearerTokenOptionName) { myFhirCtx.getRestfulClientFactory().setSocketTimeout(10 * 60 * 1000); - IGenericClient retVal = myFhirCtx.newRestfulGenericClient(baseUrl); + IGenericClient retVal = myFhirCtx.newRestfulGenericClient(theBaseUrl); String basicAuthHeaderValue = getAndParseOptionBasicAuthHeader(theCommandLine, theBasicAuthOptionName); if (isNotBlank(basicAuthHeaderValue)) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoConfig.java index 6d86ce24f82..8268c3dae9a 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/DaoConfig.java @@ -706,9 +706,9 @@ public class DaoConfig { * references instead of being treated as real references. *

* A logical reference is a reference which is treated as an identifier, and - * does not neccesarily resolve. See {@link "http://hl7.org/fhir/references.html"} for + * does not neccesarily resolve. See references for * a description of logical references. For example, the valueset - * {@link "http://hl7.org/fhir/valueset-quantity-comparator.html"} is a logical + * valueset-quantity-comparator is a logical * reference. *

*

@@ -731,9 +731,9 @@ public class DaoConfig { * references instead of being treated as real references. *

* A logical reference is a reference which is treated as an identifier, and - * does not neccesarily resolve. See {@link "http://hl7.org/fhir/references.html"} for + * does not neccesarily resolve. See references for * a description of logical references. For example, the valueset - * {@link "http://hl7.org/fhir/valueset-quantity-comparator.html"} is a logical + * valueset-quantity-comparator is a logical * reference. *

*

diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/ResourceHistoryTag.java_70782329243090 b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/ResourceHistoryTag.java_70782329243090 new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java index 690acc5ccf8..1a6273d842e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -50,6 +50,7 @@ import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.task.AsyncTaskExecutor; @@ -82,7 +83,7 @@ public abstract class BaseSubscriptionInterceptor exten private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; private MessageHandler mySubscriptionCheckingSubscriber; private ConcurrentHashMap myIdToSubscription = new ConcurrentHashMap<>(); - private ConcurrentHashMap myIdToSubscribaleChannel = new ConcurrentHashMap<>(); + private ConcurrentHashMap mySubscribableChannel = new ConcurrentHashMap<>(); private Multimap myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create()); private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class); private ThreadPoolExecutor myDeliveryExecutor; @@ -301,7 +302,7 @@ public abstract class BaseSubscriptionInterceptor exten } protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) { - return myIdToSubscribaleChannel.get(theSubscription.getIdElement(myCtx).getIdPart()); + return mySubscribableChannel.get(theSubscription.getIdElement(myCtx).getIdPart()); } public int getExecutorQueueSizeForUnitTests() { @@ -384,7 +385,7 @@ public abstract class BaseSubscriptionInterceptor exten } public void registerHandler(String theSubscriptionId, MessageHandler theHandler) { - myIdToSubscribaleChannel.get(theSubscriptionId).subscribe(theHandler); + mySubscribableChannel.get(theSubscriptionId).subscribe(theHandler); myIdToDeliveryHandler.put(theSubscriptionId, theHandler); } @@ -399,7 +400,7 @@ public abstract class BaseSubscriptionInterceptor exten SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized); Optional deliveryHandler = createDeliveryHandler(canonicalized); - myIdToSubscribaleChannel.put(subscriptionId, deliveryChannel); + mySubscribableChannel.put(subscriptionId, deliveryChannel); myIdToSubscription.put(subscriptionId, canonicalized); deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler)); @@ -545,13 +546,20 @@ public abstract class BaseSubscriptionInterceptor exten } } - public void unregisterHandler(String theSubscriptionId, MessageHandler next) { - SubscribableChannel channel = myIdToSubscribaleChannel.get(theSubscriptionId); + public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) { + SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId); if (channel != null) { - channel.unsubscribe(next); + if (channel instanceof DisposableBean) { + try { + ((DisposableBean) channel).destroy(); + } catch (Exception e) { + ourLog.error("Failed to destroy channel bean", e); + } + } + channel.unsubscribe(theMessageHandler); } - myIdToSubscribaleChannel.remove(theSubscriptionId, next); + mySubscribableChannel.remove(theSubscriptionId, theMessageHandler); } @SuppressWarnings("UnusedReturnValue") @@ -565,7 +573,7 @@ public abstract class BaseSubscriptionInterceptor exten unregisterHandler(subscriptionId, next); } - myIdToSubscribaleChannel.remove(subscriptionId); + mySubscribableChannel.remove(subscriptionId); return myIdToSubscription.remove(subscriptionId); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java index 89681e0c0da..13cdb3b0b74 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/SubscriptionActivatingSubscriber.java @@ -126,15 +126,12 @@ public class SubscriptionActivatingSubscriber { activateSubscription(activeStatus, theSubscription, requestedStatus); } } else if (activeStatus.equals(statusString)) { - if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { - ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); - } - mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription); + registerSubscriptionUnlessAlreadyRegistered(theSubscription); } else { if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue()); + mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement()); } - mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement()); } } @@ -145,7 +142,7 @@ public class SubscriptionActivatingSubscriber { try { SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus); mySubscriptionDao.update(subscription); - mySubscriptionInterceptor.registerSubscription(subscription.getIdElement(), subscription); + registerSubscriptionUnlessAlreadyRegistered(subscription); } catch (final UnprocessableEntityException e) { ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); SubscriptionUtil.setStatus(myCtx, subscription, "error"); @@ -179,6 +176,13 @@ public class SubscriptionActivatingSubscriber { } + private void registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { + if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { + ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); + mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription); + } + } + @VisibleForTesting public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) { ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java index 35ff5fc1b64..270bf3ee9b1 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/email/SubscriptionEmailInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.email; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java index 9c0558ba310..83ff6a3257d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/resthook/SubscriptionRestHookInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.resthook; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java index 95509a3180e..9189ddc6cc2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/subscription/websocket/SubscriptionWebsocketInterceptor.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.websocket; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/RestUtilities.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/RestUtilities.java deleted file mode 100644 index 94acf15c4e4..00000000000 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/RestUtilities.java +++ /dev/null @@ -1,115 +0,0 @@ -package ca.uhn.fhir.jpa.util; - -/*- - * #%L - * HAPI FHIR JPA Server - * %% - * Copyright (C) 2014 - 2018 University Health Network - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.util.EntityUtils; - -import javax.xml.ws.http.HTTPException; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * Rest service utilities. Generally used in the tests - */ -public class RestUtilities { - - public static final String CONTEXT_PATH = ""; - public static final String APPLICATION_JSON = "application/json"; - - /** - * Get the response for a CXF REST service without an object parameter - * - * @param url - * @param typeRequest - * @return - * @throws IOException - */ - public static String getResponse(String url, MethodRequest typeRequest) throws IOException { - return getResponse(url, (StringEntity) null, typeRequest); - } - - /** - * Get the response for a CXF REST service with an object parameter - * - * @param url - * @param parameterEntity - * @param typeRequest - * @return - * @throws IOException - */ - public static String getResponse(String url, StringEntity parameterEntity, MethodRequest typeRequest) throws IOException { - HttpClient httpclient = new DefaultHttpClient(); - HttpResponse response; - - switch (typeRequest) { - case POST: - HttpPost httppost = new HttpPost(url); - httppost.setHeader("Content-type", APPLICATION_JSON); - if (parameterEntity != null) { - httppost.setEntity(parameterEntity); - } - response = httpclient.execute(httppost); - break; - case PUT: - HttpPut httpPut = new HttpPut(url); - httpPut.setHeader("Content-type", APPLICATION_JSON); - if (parameterEntity != null) { - httpPut.setEntity(parameterEntity); - } - response = httpclient.execute(httpPut); - break; - case DELETE: - HttpDelete httpDelete = new HttpDelete(url); - httpDelete.setHeader("Content-type", APPLICATION_JSON); - response = httpclient.execute(httpDelete); - break; - case GET: - HttpGet httpGet = new HttpGet(url); - httpGet.setHeader("Content-type", APPLICATION_JSON); - response = httpclient.execute(httpGet); - break; - default: - throw new IllegalArgumentException("Cannot handle type request " + typeRequest); - } - - if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() >= 300) { - throw new HTTPException(response.getStatusLine().getStatusCode()); - } - - if (response.getStatusLine().getStatusCode() == 204) { - return ""; - } - - //Closes connections that have already been closed by the server - //org.apache.http.NoHttpResponseException: The target server failed to respond - httpclient.getConnectionManager().closeIdleConnections(1, TimeUnit.SECONDS); - - return EntityUtils.toString(response.getEntity()); - } -}