pre-review cleanup

This commit is contained in:
Ken Stevens 2019-10-02 08:36:54 -04:00
parent ba37ce0588
commit 53f949c2d0
19 changed files with 20 additions and 149 deletions

View File

@ -6,8 +6,8 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.binstore.BinaryAccessProvider;
import ca.uhn.fhir.jpa.binstore.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.BulkDataExportProvider;
import ca.uhn.fhir.jpa.bulk.BulkDataExportSvcImpl;
import ca.uhn.fhir.jpa.bulk.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.graphql.JpaStorageServices;
@ -28,8 +28,8 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.subscription.dbmatcher.CompositeInMemoryDaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.dbmatcher.DaoSubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
import ca.uhn.fhir.jpa.subscription.module.matcher.InMemorySubscriptionMatcher;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;

View File

@ -22,13 +22,13 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.config.BaseConfig;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionConstants;

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.subscription;
* #L%
*/
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
@ -71,7 +71,7 @@ public class SubscriptionInterceptorLoader {
private void loadSubscriptions() {
ourLog.info("Loading subscriptions into the SubscriptionRegistry...");
// Activate scheduled subscription loads into the SubscriptionRegistry
// Load active subscriptions into the SubscriptionRegistry and activate their channels
SubscriptionLoader loader = myApplicationContext.getBean(SubscriptionLoader.class);
loader.syncSubscriptions();
ourLog.info("...{} subscriptions loaded", mySubscriptionRegistry.size());

View File

@ -5,7 +5,7 @@ import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChanne
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelWithHandlers;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.JavaMailEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber;
import org.hl7.fhir.dstu2.model.Subscription;
@ -79,7 +79,8 @@ public class SubscriptionTestUtil {
public void setEmailSender(IIdType theIdElement) {
ActiveSubscription activeSubscription = mySubscriptionRegistry.get(theIdElement.getIdPart());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) mySubscriptionChannelRegistry.get(activeSubscription.getChannelName()).getDeliveryHandlerForUnitTest();
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = mySubscriptionChannelRegistry.get(activeSubscription.getChannelName());
SubscriptionDeliveringEmailSubscriber subscriber = (SubscriptionDeliveringEmailSubscriber) subscriptionChannelWithHandlers.getDeliveryHandlerForUnitTest();
subscriber.setEmailSender(myEmailSender);
}

View File

@ -20,10 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
*/
import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
import org.springframework.messaging.SubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import org.springframework.messaging.SubscribableChannel;
import java.util.concurrent.LinkedBlockingQueue;

View File

@ -1,15 +1,10 @@
package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -22,10 +17,6 @@ class SubscriptionChannelCache {
return myCache.get(theChannelName);
}
public Collection<SubscriptionChannelWithHandlers> getAll() {
return Collections.unmodifiableCollection(myCache.values());
}
public int size() {
return myCache.size();
}
@ -34,7 +25,7 @@ class SubscriptionChannelCache {
myCache.put(theChannelName, theValue);
}
public synchronized void remove(String theChannelName) {
synchronized void closeAndRemove(String theChannelName) {
Validate.notBlank(theChannelName);
SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = myCache.get(theChannelName);

View File

@ -20,11 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import org.springframework.messaging.SubscribableChannel;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component

View File

@ -61,19 +61,19 @@ public class SubscriptionChannelRegistry {
return;
}
String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}: {}", theActiveSubscription.getId() ,channelName, myActiveSubscriptionByChannelName);
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
}
// This was the last one. Shut down the channel
// This was the last one. Close and remove the channel
if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName);
if (channel != null) {
channel.close();
}
mySubscriptionChannelCache.remove(channelName);
mySubscriptionChannelCache.closeAndRemove(channelName);
}
}

View File

@ -63,8 +63,4 @@ public class SubscriptionChannelWithHandlers implements Closeable {
public MessageChannel getChannel() {
return mySubscribableChannel;
}
public String getChannelName() {
return myChannelName;
}
}

View File

@ -20,16 +20,13 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.module.subscriber.email.SubscriptionDeliveringEmailSubscriber;
import org.hl7.fhir.r4.model.Subscription;
import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
import org.thymeleaf.util.Validate;
import java.util.Optional;

View File

@ -21,8 +21,8 @@ package ca.uhn.fhir.jpa.subscription.module.config;
*/
import ca.uhn.fhir.interceptor.executor.InterceptorService;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.LinkedBlockingQueueSubscribableChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscribableChannelFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

View File

@ -159,7 +159,6 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
handleFailure(e);
}
}
}
private class InitialState implements IState {
@ -212,94 +211,3 @@ public class SubscriptionWebsocketHandler extends TextWebSocketHandler implement
}
}
// private IIdType bingSearch(WebSocketSession theSession, String theRemaining) {
// Subscription subscription = new Subscription();
// subscription.getChannel().setType(SubscriptionChannelType.WEBSOCKET);
// subscription.setStatus(SubscriptionStatus.ACTIVE);
// subscription.setCriteria(theRemaining);
//
// try {
// String params = theRemaining.substring(theRemaining.indexOf('?')+1);
// List<NameValuePair> paramValues = URLEncodedUtils.parse(params, Constants.CHARSET_UTF8, '&');
// EncodingEnum encoding = EncodingEnum.JSON;
// for (NameValuePair nameValuePair : paramValues) {
// if (Constants.PARAM_FORMAT.equals(nameValuePair.getName())) {
// EncodingEnum nextEncoding = EncodingEnum.forContentType(nameValuePair.getValue());
// if (nextEncoding != null) {
// encoding = nextEncoding;
// }
// }
// }
//
// IIdType id = ourSubscriptionDao.create(subscription).getId();
//
// mySubscriptionPid = ourSubscriptionDao.getSubscriptionTablePidForSubscriptionResource(id);
// mySubscriptionId = subscription.getIdElement();
// myState = new BoundDynamicSubscriptionState(theSession, encoding);
//
// return id;
// } catch (UnprocessableEntityException e) {
// ourLog.warn("Failed to bind subscription: " + e.getMessage());
// try {
// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - " + e.getMessage()));
// } catch (IOException e2) {
// handleFailure(e2);
// }
// } catch (Exception e) {
// handleFailure(e);
// try {
// theSession.close(new CloseStatus(CloseStatus.PROTOCOL_ERROR.getCode(), "Invalid bind request - No ID included"));
// } catch (IOException e2) {
// handleFailure(e2);
// }
// }
// return null;
// }
//private class BoundDynamicSubscriptionState implements SubscriptionWebsocketHandler.IState {
//
// private EncodingEnum myEncoding;
// private WebSocketSession mySession;
//
// public BoundDynamicSubscriptionState(WebSocketSession theSession, EncodingEnum theEncoding) {
// mySession = theSession;
// myEncoding = theEncoding;
// }
//
// @Override
// public void closing() {
// ourLog.info("Deleting subscription {}", mySubscriptionId);
// try {
// ourSubscriptionDao.delete(mySubscriptionId, null);
// } catch (Exception e) {
// handleFailure(e);
// }
// }
//
// @Override
// public void deliver(List<IBaseResource> theResults) {
// try {
// for (IBaseResource nextResource : theResults) {
// ourLog.info("Sending WebSocket message for resource: {}", nextResource.getIdElement());
// String encoded = myEncoding.newParser(ourCtx).encodeResourceToString(nextResource);
// String payload = "add " + mySubscriptionId.getIdPart() + '\n' + encoded;
// mySession.sendMessage(new TextMessage(payload));
// }
// } catch (IOException e) {
// handleFailure(e);
// }
// }
//
// @Override
// public void handleTextMessage(WebSocketSession theSession, TextMessage theMessage) {
// try {
// theSession.sendMessage(new TextMessage("Unexpected client message: " + theMessage.getPayload()));
// } catch (IOException e) {
// handleFailure(e);
// }
// }
//
//}

View File

@ -1,13 +1,8 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.springframework.beans.factory.annotation.Autowired;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
public abstract class BaseSubscriptionRegistryTest extends BaseSubscriptionDstu3Test {
public static final String SUBSCRIPTION_ID = "1";

View File

@ -2,17 +2,13 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionDeliveryChannelNamer;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.junit.Test;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import static org.junit.Assert.*;
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class SubscriptionRegistrySharedTest extends BaseSubscriptionRegistryTest {

View File

@ -1,7 +1,6 @@
package ca.uhn.fhir.jpa.subscription.module.cache;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.*;

View File

@ -10,10 +10,10 @@ import ca.uhn.fhir.jpa.subscription.module.BaseSubscriptionDstu3Test;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriberTest;
@ -210,8 +210,6 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
}
@Override
public void clear() {
updateLatch.clear();
}
public void clear() { updateLatch.clear();}
}
}

View File

@ -1,9 +1,6 @@
package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionLoader;
import ca.uhn.fhir.jpa.subscription.module.config.MockFhirClientSubscriptionProvider;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.After;
import org.junit.Before;

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
import org.hl7.fhir.r4.model.IdType;
import org.junit.Before;
import org.junit.Test;
@ -13,7 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;