diff --git a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java index 292395a55a..f9826e0210 100644 --- a/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java +++ b/artemis-service-extensions/src/main/java/org/apache/activemq/artemis/service/extensions/xa/recovery/XARecoveryConfig.java @@ -45,16 +45,41 @@ public class XARecoveryConfig { private final Map properties; private final ClientProtocolManagerFactory clientProtocolManager; + // ServerLocator properties + private Long callFailoverTimeout; + private Long callTimeout; + private Long clientFailureCheckPeriod; + private Integer confirmationWindowSize; + private String connectionLoadBalancingPolicyClassName; + private Long connectionTTL; + private Integer consumerMaxRate; + private Integer consumerWindowSize; + private Integer initialConnectAttempts; + private Integer producerMaxRate; + private Integer producerWindowSize; + private Integer minLargeMessageSize; + private Long retryInterval; + private Double retryIntervalMultiplier; + private Long maxRetryInterval; + private Integer reconnectAttempts; + private Integer initialMessagePacketSize; + private Integer scheduledThreadPoolMaxSize; + private Integer threadPoolMaxSize; + private boolean autoGroup; + private boolean blockOnAcknowledge; + private boolean blockOnNonDurableSend; + private boolean blockOnDurableSend; + private boolean preAcknowledge; + private boolean useGlobalPools; + private boolean cacheLargeMessagesClient; + private boolean compressLargeMessage; + private boolean failoverOnInitialConnection; + public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory, String userName, String password, Map properties) { - if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); - } else { - return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory()); - } - + return new XARecoveryConfig(factory.getServerLocator(), userName, password, properties); } public XARecoveryConfig(final boolean ha, @@ -112,6 +137,37 @@ public class XARecoveryConfig { this(ha, discoveryConfiguration, username, password, properties, null); } + private XARecoveryConfig(ServerLocator serverLocator, + String username, + String password, + Map properties) { + ClientProtocolManagerFactory clientProtocolManager = serverLocator.getProtocolManagerFactory(); + if (serverLocator.getDiscoveryGroupConfiguration() != null) { + this.discoveryConfiguration = serverLocator.getDiscoveryGroupConfiguration(); + this.transportConfiguration = null; + } else { + TransportConfiguration[] transportConfiguration = serverLocator.getStaticTransportConfigurations(); + TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length]; + for (int i = 0; i < transportConfiguration.length; i++) { + if (clientProtocolManager != null) { + newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig("")); + } else { + newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig(""); + } + } + + this.transportConfiguration = newTransportConfiguration; + this.discoveryConfiguration = null; + } + this.username = username; + this.password = password; + this.ha = serverLocator.isHA(); + this.properties = properties == null ? Collections.unmodifiableMap(new HashMap()) : Collections.unmodifiableMap(properties); + this.clientProtocolManager = clientProtocolManager; + + readLocatorProperties(serverLocator); + } + public boolean isHA() { return ha; } @@ -146,12 +202,87 @@ public class XARecoveryConfig { * @return locator */ public ServerLocator createServerLocator() { + ServerLocator serverLocator; if (getDiscoveryConfiguration() != null) { - return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager); + serverLocator = ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager); } else { - return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); + serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager); } + writeLocatorProperties(serverLocator); + + return serverLocator; + } + + private void writeLocatorProperties(ServerLocator serverLocator) { + serverLocator.setAutoGroup(this.autoGroup); + serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge); + serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend); + serverLocator.setBlockOnDurableSend(this.blockOnDurableSend); + serverLocator.setPreAcknowledge(this.preAcknowledge); + serverLocator.setUseGlobalPools(this.useGlobalPools); + serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient); + serverLocator.setCompressLargeMessage(this.compressLargeMessage); + serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection); + + serverLocator.setConsumerMaxRate(this.consumerMaxRate); + serverLocator.setConsumerWindowSize(this.consumerWindowSize); + serverLocator.setMinLargeMessageSize(this.minLargeMessageSize); + serverLocator.setProducerMaxRate(this.producerMaxRate); + serverLocator.setProducerWindowSize(this.producerWindowSize); + serverLocator.setConfirmationWindowSize(this.confirmationWindowSize); + serverLocator.setReconnectAttempts(this.reconnectAttempts); + serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize); + serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize); + serverLocator.setInitialConnectAttempts(this.initialConnectAttempts); + serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize); + + serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod); + serverLocator.setCallTimeout(this.callTimeout); + serverLocator.setCallFailoverTimeout(this.callFailoverTimeout); + serverLocator.setConnectionTTL(this.connectionTTL); + serverLocator.setRetryInterval(this.retryInterval); + serverLocator.setMaxRetryInterval(this.maxRetryInterval); + + serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier); + + serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName); +} + + private void readLocatorProperties(ServerLocator locator) { + + this.autoGroup = locator.isAutoGroup(); + this.blockOnAcknowledge = locator.isBlockOnAcknowledge(); + this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend(); + this.blockOnDurableSend = locator.isBlockOnDurableSend(); + this.preAcknowledge = locator.isPreAcknowledge(); + this.useGlobalPools = locator.isUseGlobalPools(); + this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient(); + this.compressLargeMessage = locator.isCompressLargeMessage(); + this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection(); + + this.consumerMaxRate = locator.getConsumerMaxRate(); + this.consumerWindowSize = locator.getConsumerWindowSize(); + this.minLargeMessageSize = locator.getMinLargeMessageSize(); + this.producerMaxRate = locator.getProducerMaxRate(); + this.producerWindowSize = locator.getProducerWindowSize(); + this.confirmationWindowSize = locator.getConfirmationWindowSize(); + this.reconnectAttempts = locator.getReconnectAttempts(); + this.threadPoolMaxSize = locator.getThreadPoolMaxSize(); + this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize(); + this.initialConnectAttempts = locator.getInitialConnectAttempts(); + this.initialMessagePacketSize = locator.getInitialMessagePacketSize(); + + this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod(); + this.callTimeout = locator.getCallTimeout(); + this.callFailoverTimeout = locator.getCallFailoverTimeout(); + this.connectionTTL = locator.getConnectionTTL(); + this.retryInterval = locator.getRetryInterval(); + this.maxRetryInterval = locator.getMaxRetryInterval(); + + this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier(); + + this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java index 6216bb354d..fa7d0c8b5a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ResourceAdapterTest.java @@ -797,6 +797,40 @@ public class ResourceAdapterTest extends ActiveMQRATestBase { } } + @Test + public void testConnectionFactoryPropertiesApplyToRecoveryConfig() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(false, false, false); + ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test"); + session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true); + session.close(); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + + ra.setConnectorClassName(INVM_CONNECTOR_FACTORY); + ra.setUserName("userGlobal"); + ra.setPassword("passwordGlobal"); + ra.setConnectionTTL(100L); + ra.setCallFailoverTimeout(100L); + ra.start(new BootstrapContext()); + + Set resources = ra.getRecoveryManager().getResources(); + assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getConnectionTTL()); + assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getCallFailoverTimeout()); + + + for (XARecoveryConfig resource : resources) { + assertEquals(100L, resource.createServerLocator().getConnectionTTL()); + assertEquals(100L, resource.createServerLocator().getCallFailoverTimeout()); + } + + ra.stop(); + assertEquals(0, resources.size()); + locator.close(); + + } + @Override public boolean useSecurity() { return false;