[ARTEMIS-2176] RA connection properties are not propagated to XARecoveryConfig

This commit is contained in:
Bartosz Spyrko-Smietanko 2018-11-13 15:50:33 +00:00
parent 6a04a33e6e
commit eb41be78f3
2 changed files with 173 additions and 8 deletions

View File

@ -45,16 +45,41 @@ public class XARecoveryConfig {
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;
// ServerLocator properties
private Long callFailoverTimeout;
private Long callTimeout;
private Long clientFailureCheckPeriod;
private Integer confirmationWindowSize;
private String connectionLoadBalancingPolicyClassName;
private Long connectionTTL;
private Integer consumerMaxRate;
private Integer consumerWindowSize;
private Integer initialConnectAttempts;
private Integer producerMaxRate;
private Integer producerWindowSize;
private Integer minLargeMessageSize;
private Long retryInterval;
private Double retryIntervalMultiplier;
private Long maxRetryInterval;
private Integer reconnectAttempts;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
private Integer threadPoolMaxSize;
private boolean autoGroup;
private boolean blockOnAcknowledge;
private boolean blockOnNonDurableSend;
private boolean blockOnDurableSend;
private boolean preAcknowledge;
private boolean useGlobalPools;
private boolean cacheLargeMessagesClient;
private boolean compressLargeMessage;
private boolean failoverOnInitialConnection;
public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
String userName,
String password,
Map<String, String> properties) {
if (factory.getServerLocator().getDiscoveryGroupConfiguration() != null) {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getDiscoveryGroupConfiguration(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
} else {
return new XARecoveryConfig(factory.getServerLocator().isHA(), factory.getServerLocator().getStaticTransportConfigurations(), userName, password, properties, factory.getServerLocator().getProtocolManagerFactory());
}
return new XARecoveryConfig(factory.getServerLocator(), userName, password, properties);
}
public XARecoveryConfig(final boolean ha,
@ -112,6 +137,37 @@ public class XARecoveryConfig {
this(ha, discoveryConfiguration, username, password, properties, null);
}
private XARecoveryConfig(ServerLocator serverLocator,
String username,
String password,
Map<String, String> properties) {
ClientProtocolManagerFactory clientProtocolManager = serverLocator.getProtocolManagerFactory();
if (serverLocator.getDiscoveryGroupConfiguration() != null) {
this.discoveryConfiguration = serverLocator.getDiscoveryGroupConfiguration();
this.transportConfiguration = null;
} else {
TransportConfiguration[] transportConfiguration = serverLocator.getStaticTransportConfigurations();
TransportConfiguration[] newTransportConfiguration = new TransportConfiguration[transportConfiguration.length];
for (int i = 0; i < transportConfiguration.length; i++) {
if (clientProtocolManager != null) {
newTransportConfiguration[i] = clientProtocolManager.adaptTransportConfiguration(transportConfiguration[i].newTransportConfig(""));
} else {
newTransportConfiguration[i] = transportConfiguration[i].newTransportConfig("");
}
}
this.transportConfiguration = newTransportConfiguration;
this.discoveryConfiguration = null;
}
this.username = username;
this.password = password;
this.ha = serverLocator.isHA();
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;
readLocatorProperties(serverLocator);
}
public boolean isHA() {
return ha;
}
@ -146,12 +202,87 @@ public class XARecoveryConfig {
* @return locator
*/
public ServerLocator createServerLocator() {
ServerLocator serverLocator;
if (getDiscoveryConfiguration() != null) {
return ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
serverLocator = ActiveMQClient.createServerLocator(isHA(), getDiscoveryConfiguration()).setProtocolManagerFactory(clientProtocolManager);
} else {
return ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}
writeLocatorProperties(serverLocator);
return serverLocator;
}
private void writeLocatorProperties(ServerLocator serverLocator) {
serverLocator.setAutoGroup(this.autoGroup);
serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge);
serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend);
serverLocator.setBlockOnDurableSend(this.blockOnDurableSend);
serverLocator.setPreAcknowledge(this.preAcknowledge);
serverLocator.setUseGlobalPools(this.useGlobalPools);
serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient);
serverLocator.setCompressLargeMessage(this.compressLargeMessage);
serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection);
serverLocator.setConsumerMaxRate(this.consumerMaxRate);
serverLocator.setConsumerWindowSize(this.consumerWindowSize);
serverLocator.setMinLargeMessageSize(this.minLargeMessageSize);
serverLocator.setProducerMaxRate(this.producerMaxRate);
serverLocator.setProducerWindowSize(this.producerWindowSize);
serverLocator.setConfirmationWindowSize(this.confirmationWindowSize);
serverLocator.setReconnectAttempts(this.reconnectAttempts);
serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize);
serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize);
serverLocator.setInitialConnectAttempts(this.initialConnectAttempts);
serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize);
serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
serverLocator.setCallTimeout(this.callTimeout);
serverLocator.setCallFailoverTimeout(this.callFailoverTimeout);
serverLocator.setConnectionTTL(this.connectionTTL);
serverLocator.setRetryInterval(this.retryInterval);
serverLocator.setMaxRetryInterval(this.maxRetryInterval);
serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier);
serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName);
}
private void readLocatorProperties(ServerLocator locator) {
this.autoGroup = locator.isAutoGroup();
this.blockOnAcknowledge = locator.isBlockOnAcknowledge();
this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend();
this.blockOnDurableSend = locator.isBlockOnDurableSend();
this.preAcknowledge = locator.isPreAcknowledge();
this.useGlobalPools = locator.isUseGlobalPools();
this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient();
this.compressLargeMessage = locator.isCompressLargeMessage();
this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection();
this.consumerMaxRate = locator.getConsumerMaxRate();
this.consumerWindowSize = locator.getConsumerWindowSize();
this.minLargeMessageSize = locator.getMinLargeMessageSize();
this.producerMaxRate = locator.getProducerMaxRate();
this.producerWindowSize = locator.getProducerWindowSize();
this.confirmationWindowSize = locator.getConfirmationWindowSize();
this.reconnectAttempts = locator.getReconnectAttempts();
this.threadPoolMaxSize = locator.getThreadPoolMaxSize();
this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize();
this.initialConnectAttempts = locator.getInitialConnectAttempts();
this.initialMessagePacketSize = locator.getInitialMessagePacketSize();
this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod();
this.callTimeout = locator.getCallTimeout();
this.callFailoverTimeout = locator.getCallFailoverTimeout();
this.connectionTTL = locator.getConnectionTTL();
this.retryInterval = locator.getRetryInterval();
this.maxRetryInterval = locator.getMaxRetryInterval();
this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier();
this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName();
}
@Override

View File

@ -797,6 +797,40 @@ public class ResourceAdapterTest extends ActiveMQRATestBase {
}
}
@Test
public void testConnectionFactoryPropertiesApplyToRecoveryConfig() throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test");
session.createQueue(queue.getSimpleAddress(), queue.getSimpleAddress(), true);
session.close();
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setConnectorClassName(INVM_CONNECTOR_FACTORY);
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
ra.setConnectionTTL(100L);
ra.setCallFailoverTimeout(100L);
ra.start(new BootstrapContext());
Set<XARecoveryConfig> resources = ra.getRecoveryManager().getResources();
assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getConnectionTTL());
assertEquals(100L, ra.getDefaultActiveMQConnectionFactory().getServerLocator().getCallFailoverTimeout());
for (XARecoveryConfig resource : resources) {
assertEquals(100L, resource.createServerLocator().getConnectionTTL());
assertEquals(100L, resource.createServerLocator().getCallFailoverTimeout());
}
ra.stop();
assertEquals(0, resources.size());
locator.close();
}
@Override
public boolean useSecurity() {
return false;