This commit is contained in:
Anthony Sute 2019-11-11 17:22:50 -05:00
commit b27af398b4
6 changed files with 27 additions and 11 deletions

View File

@ -28,7 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory { public class LinkedBlockingQueueSubscribableChannelFactory implements ISubscribableChannelFactory {
@Override @Override
public SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers) { public SubscribableChannel createSubscribableChannel(String theChannelName, Class theMessageType, int theConcurrentConsumers) {
return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers); return new LinkedBlockingQueueSubscribableChannel(new LinkedBlockingQueue<>(SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE), theChannelName + "-%d", theConcurrentConsumers);
} }

View File

@ -23,7 +23,7 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
public interface ISubscribableChannelFactory { public interface ISubscribableChannelFactory {
SubscribableChannel createSubscribableChannel(String theChannelName, int theConcurrentConsumers); SubscribableChannel createSubscribableChannel(String theChannelName, Class theMessageType, int theConcurrentConsumers);
int getDeliveryChannelConcurrentConsumers(); int getDeliveryChannelConcurrentConsumers();

View File

@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.subscription.module.channel;
* #L% * #L%
*/ */
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceDeliveryMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -35,10 +37,10 @@ public class SubscriptionChannelFactory {
} }
public SubscribableChannel newDeliveryChannel(String theChannelName) { public SubscribableChannel newDeliveryChannel(String theChannelName) {
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers()); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, ResourceDeliveryMessage.class, mySubscribableChannelFactory.getDeliveryChannelConcurrentConsumers());
} }
public SubscribableChannel newMatchingChannel(String theChannelName) { public SubscribableChannel newMatchingChannel(String theChannelName) {
return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers()); return mySubscribableChannelFactory.createSubscribableChannel(theChannelName, ResourceModifiedMessage.class, mySubscribableChannelFactory.getMatchingChannelConcurrentConsumers());
} }
} }

View File

@ -48,9 +48,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>(); private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>();
private final AtomicReference<List<String>> myFailures = new AtomicReference<>(); private final AtomicReference<List<String>> myFailures = new AtomicReference<>();
private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>(); private final AtomicReference<List<HookParams>> myCalledWith = new AtomicReference<>();
private int myDefaultTimeoutSeconds = DEFAULT_TIMEOUT_SECONDS;
private final Pointcut myPointcut; private final Pointcut myPointcut;
private int myInitialCount; private int myInitialCount;
public PointcutLatch(Pointcut thePointcut) { public PointcutLatch(Pointcut thePointcut) {
this.name = thePointcut.name(); this.name = thePointcut.name();
myPointcut = thePointcut; myPointcut = thePointcut;
@ -61,6 +63,11 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
myPointcut = null; myPointcut = null;
} }
public PointcutLatch setDefaultTimeoutSeconds(int theDefaultTimeoutSeconds) {
myDefaultTimeoutSeconds = theDefaultTimeoutSeconds;
return this;
}
@Override @Override
public void setExpectedCount(int count) { public void setExpectedCount(int count) {
if (myCountdownLatch.get() != null) { if (myCountdownLatch.get() != null) {
@ -91,7 +98,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override @Override
public List<HookParams> awaitExpected() throws InterruptedException { public List<HookParams> awaitExpected() throws InterruptedException {
return awaitExpectedWithTimeout(DEFAULT_TIMEOUT_SECONDS); return awaitExpectedWithTimeout(myDefaultTimeoutSeconds);
} }
public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException { public List<HookParams> awaitExpectedWithTimeout(int timeoutSecond) throws InterruptedException {

View File

@ -625,8 +625,8 @@
<hibernate_validator_version>5.4.2.Final</hibernate_validator_version> <hibernate_validator_version>5.4.2.Final</hibernate_validator_version>
<httpcore_version>4.4.11</httpcore_version> <httpcore_version>4.4.11</httpcore_version>
<httpclient_version>4.5.9</httpclient_version> <httpclient_version>4.5.9</httpclient_version>
<jackson_version>2.9.9</jackson_version> <jackson_version>2.10.0</jackson_version>
<jackson_databind_version>2.9.10</jackson_databind_version> <jackson_databind_version>2.10.0</jackson_databind_version>
<maven_assembly_plugin_version>3.1.0</maven_assembly_plugin_version> <maven_assembly_plugin_version>3.1.0</maven_assembly_plugin_version>
<maven_license_plugin_version>1.8</maven_license_plugin_version> <maven_license_plugin_version>1.8</maven_license_plugin_version>
<resteasy_version>4.0.0.Beta3</resteasy_version> <resteasy_version>4.0.0.Beta3</resteasy_version>

View File

@ -79,6 +79,13 @@
stored in the RDBMS. stored in the RDBMS.
]]> ]]>
</action> </action>
<action type="fix">
<![CDATA[
<b>New Feature</b>:
The R4 structures have been upgraded to the new 4.0.1 (Technical Correction) release, and the
R5 structure have been upgraded to the current (October) snapshot.
]]>
</action>
<action type="add" issue="1489"> <action type="add" issue="1489">
<![CDATA[ <![CDATA[
<b>Performance Improvement</b>: <b>Performance Improvement</b>: