ARTEMIS-2716 Refactoring

This commit is contained in:
Howard Gao 2020-03-03 18:48:18 +08:00 committed by Bartosz Spyrko-Smietanko
parent eb41be78f3
commit cb8da54110
5 changed files with 202 additions and 343 deletions

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.config;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
public class ServerLocatorConfig {
public long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
public long connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL;
public long callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
public long callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT;
public int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
public int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
public int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
public int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
public int producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
public int producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE;
public boolean blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
public boolean blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
public boolean blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
public boolean autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP;
public boolean preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE;
public int ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
public String connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
public boolean useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
public int threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
public int scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
public long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
public double retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
public ServerLocatorConfig() {
}
public ServerLocatorConfig(final ServerLocatorConfig locator) {
compressLargeMessage = locator.compressLargeMessage;
cacheLargeMessagesClient = locator.cacheLargeMessagesClient;
clientFailureCheckPeriod = locator.clientFailureCheckPeriod;
connectionTTL = locator.connectionTTL;
callTimeout = locator.callTimeout;
callFailoverTimeout = locator.callFailoverTimeout;
minLargeMessageSize = locator.minLargeMessageSize;
consumerWindowSize = locator.consumerWindowSize;
consumerMaxRate = locator.consumerMaxRate;
confirmationWindowSize = locator.confirmationWindowSize;
producerWindowSize = locator.producerWindowSize;
producerMaxRate = locator.producerMaxRate;
blockOnAcknowledge = locator.blockOnAcknowledge;
blockOnDurableSend = locator.blockOnDurableSend;
blockOnNonDurableSend = locator.blockOnNonDurableSend;
autoGroup = locator.autoGroup;
preAcknowledge = locator.preAcknowledge;
connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName;
ackBatchSize = locator.ackBatchSize;
useGlobalPools = locator.useGlobalPools;
scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
threadPoolMaxSize = locator.threadPoolMaxSize;
retryInterval = locator.retryInterval;
retryIntervalMultiplier = locator.retryIntervalMultiplier;
maxRetryInterval = locator.maxRetryInterval;
reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.core.client;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Interceptor;
@ -822,4 +823,8 @@ public interface ServerLocator extends AutoCloseable {
/** This will only instantiate internal objects such as the topology */
void initialize() throws ActiveMQException;
ServerLocatorConfig getLocatorConfig();
void setLocatorConfig(ServerLocatorConfig serverLocatorConfig);
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@ -156,13 +157,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connectorConfig,
final long callTimeout,
final long callFailoverTimeout,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
final double retryIntervalMultiplier,
final long maxRetryInterval,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
@ -182,27 +177,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
checkTransportKeys(connectorFactory, connectorConfig);
this.callTimeout = callTimeout;
this.callTimeout = locatorConfig.callTimeout;
this.callFailoverTimeout = callFailoverTimeout;
this.callFailoverTimeout = locatorConfig.callFailoverTimeout;
// HORNETQ-1314 - if this in an in-vm connection then disable connection monitoring
if (connectorFactory.isReliable() &&
clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD &&
connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) {
locatorConfig.clientFailureCheckPeriod == ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD &&
locatorConfig.connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) {
this.clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM;
this.connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL_INVM;
} else {
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.clientFailureCheckPeriod = locatorConfig.clientFailureCheckPeriod;
this.connectionTTL = connectionTTL;
this.connectionTTL = locatorConfig.connectionTTL;
}
this.retryInterval = retryInterval;
this.retryInterval = locatorConfig.retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.retryIntervalMultiplier = locatorConfig.retryIntervalMultiplier;
this.maxRetryInterval = maxRetryInterval;
this.maxRetryInterval = locatorConfig.maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;

View File

@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@ -122,7 +123,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private volatile boolean receivedTopology;
private boolean compressLargeMessage;
/** This specifies serverLocator.connect was used,
* which means it's a cluster connection.
@ -144,59 +144,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// Settable attributes:
private boolean cacheLargeMessagesClient;
private long clientFailureCheckPeriod;
private long connectionTTL;
private long callTimeout;
private long callFailoverTimeout;
private int minLargeMessageSize;
private int consumerWindowSize;
private int consumerMaxRate;
private int confirmationWindowSize;
private int producerWindowSize;
private int producerMaxRate;
private boolean blockOnAcknowledge;
private boolean blockOnDurableSend;
private boolean blockOnNonDurableSend;
private boolean autoGroup;
private boolean preAcknowledge;
private String connectionLoadBalancingPolicyClassName;
private int ackBatchSize;
private boolean useGlobalPools;
private int scheduledThreadPoolMaxSize;
private int threadPoolMaxSize;
private long retryInterval;
private double retryIntervalMultiplier;
private long maxRetryInterval;
private int reconnectAttempts;
private int initialConnectAttempts;
private int initialMessagePacketSize;
private final Object stateGuard = new Object();
private transient STATE state;
@ -227,6 +174,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private final Exception traceException = new Exception();
private ServerLocatorConfig config = new ServerLocatorConfig();
public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
}
@ -234,7 +183,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private synchronized void setThreadPools() {
if (threadPool != null) {
return;
} else if (useGlobalPools) {
} else if (config.useGlobalPools) {
threadPool = ActiveMQClient.getGlobalThreadPool();
scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool();
@ -248,10 +197,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
});
if (threadPoolMaxSize == -1) {
if (config.threadPoolMaxSize == -1) {
threadPool = Executors.newCachedThreadPool(factory);
} else {
threadPool = new ActiveMQThreadPoolExecutor(0, threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
threadPool = new ActiveMQThreadPoolExecutor(0, config.threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory);
}
factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
@ -261,7 +210,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
});
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
scheduledThreadPool = Executors.newScheduledThreadPool(config.scheduledThreadPoolMaxSize, factory);
}
this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
}
@ -273,7 +222,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return false;
if (this.threadPool == null && this.scheduledThreadPool == null) {
useGlobalPools = false;
config.useGlobalPools = false;
shutdownPool = false;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
@ -284,13 +233,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
private void instantiateLoadBalancingPolicy() {
if (connectionLoadBalancingPolicyClassName == null) {
if (config.connectionLoadBalancingPolicyClassName == null) {
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
}
AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, connectionLoadBalancingPolicyClassName);
loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(ServerLocatorImpl.class, config.connectionLoadBalancingPolicyClassName);
return null;
}
});
@ -336,6 +285,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
@Override
public ServerLocatorConfig getLocatorConfig() {
return config;
}
@Override
public void setLocatorConfig(ServerLocatorConfig config) {
this.config = config;
}
private static DiscoveryGroup createDiscoveryGroup(String nodeID,
DiscoveryGroupConfiguration config) throws Exception {
return new DiscoveryGroup(nodeID, config.getName(), config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
@ -357,67 +316,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL;
callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT;
callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT;
minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE;
blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP;
preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE;
ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS;
threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
clusterConnection = false;
useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
}
public static ServerLocator newLocator(String uri) {
@ -515,40 +414,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
topology = locator.topology;
topologyArray = locator.topologyArray;
receivedTopology = locator.receivedTopology;
compressLargeMessage = locator.compressLargeMessage;
cacheLargeMessagesClient = locator.cacheLargeMessagesClient;
clientFailureCheckPeriod = locator.clientFailureCheckPeriod;
connectionTTL = locator.connectionTTL;
callTimeout = locator.callTimeout;
callFailoverTimeout = locator.callFailoverTimeout;
minLargeMessageSize = locator.minLargeMessageSize;
consumerWindowSize = locator.consumerWindowSize;
consumerMaxRate = locator.consumerMaxRate;
confirmationWindowSize = locator.confirmationWindowSize;
producerWindowSize = locator.producerWindowSize;
producerMaxRate = locator.producerMaxRate;
blockOnAcknowledge = locator.blockOnAcknowledge;
blockOnDurableSend = locator.blockOnDurableSend;
blockOnNonDurableSend = locator.blockOnNonDurableSend;
autoGroup = locator.autoGroup;
preAcknowledge = locator.preAcknowledge;
connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName;
ackBatchSize = locator.ackBatchSize;
useGlobalPools = locator.useGlobalPools;
scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize;
threadPoolMaxSize = locator.threadPoolMaxSize;
retryInterval = locator.retryInterval;
retryIntervalMultiplier = locator.retryIntervalMultiplier;
maxRetryInterval = locator.maxRetryInterval;
reconnectAttempts = locator.reconnectAttempts;
initialConnectAttempts = locator.initialConnectAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
config = new ServerLocatorConfig(locator.config);
startExecutor = locator.startExecutor;
afterConnectListener = locator.afterConnectListener;
groupID = locator.groupID;
nodeID = locator.nodeID;
clusterTransportConfiguration = locator.clusterTransportConfiguration;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
}
private TransportConfiguration selectConnector() {
@ -561,7 +432,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
if (usedTopology != null && useTopologyForLoadBalancing) {
if (usedTopology != null && config.useTopologyForLoadBalancing) {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology.");
}
@ -591,7 +462,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
if (usedTopology != null && useTopologyForLoadBalancing) {
if (usedTopology != null && config.useTopologyForLoadBalancing) {
return usedTopology.length;
} else {
return initialConnectors.length;
@ -721,7 +592,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception {
return createSessionFactory(transportConfiguration, reconnectAttempts);
return createSessionFactory(transportConfiguration, config.reconnectAttempts);
}
@Override
@ -731,7 +602,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
initialize();
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, config, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
addToConnecting(factory);
try {
@ -798,7 +669,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// try each factory in the list until we find one which works
try {
factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
try {
addToConnecting(factory);
// We always try to connect here with only one attempt,
@ -813,12 +684,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
attempts++;
int connectorsSize = getConnectorsSize();
int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;
if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
if (factory.waitForRetry(retryInterval)) {
if (factory.waitForRetry(config.retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
retry = true;
@ -836,7 +707,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// ATM topology is never != null. Checking here just to be consistent with
// how the sendSubscription happens.
// in case this ever changes.
if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) {
if (topology != null && !factory.waitForTopology(config.callTimeout, TimeUnit.MILLISECONDS)) {
factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
@ -855,12 +726,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
discoveryOK = checkOnDiscovery();
retryDiscovery = (initialConnectAttempts > 0 && tryNumber++ < initialConnectAttempts) && !disableDiscoveryRetries;
retryDiscovery = (config.initialConnectAttempts > 0 && tryNumber++ < config.initialConnectAttempts) && !disableDiscoveryRetries;
if (!discoveryOK) {
if (retryDiscovery) {
ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, initialConnectAttempts);
ActiveMQClientLogger.LOGGER.broadcastTimeout(tryNumber, config.initialConnectAttempts);
} else {
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast();
}
@ -960,301 +831,301 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public boolean isCacheLargeMessagesClient() {
return cacheLargeMessagesClient;
return config.cacheLargeMessagesClient;
}
@Override
public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) {
cacheLargeMessagesClient = cached;
config.cacheLargeMessagesClient = cached;
return this;
}
@Override
public long getClientFailureCheckPeriod() {
return clientFailureCheckPeriod;
return config.clientFailureCheckPeriod;
}
@Override
public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) {
checkWrite();
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.config.clientFailureCheckPeriod = clientFailureCheckPeriod;
return this;
}
@Override
public long getConnectionTTL() {
return connectionTTL;
return config.connectionTTL;
}
@Override
public ServerLocatorImpl setConnectionTTL(final long connectionTTL) {
checkWrite();
this.connectionTTL = connectionTTL;
this.config.connectionTTL = connectionTTL;
return this;
}
@Override
public long getCallTimeout() {
return callTimeout;
return config.callTimeout;
}
@Override
public ServerLocatorImpl setCallTimeout(final long callTimeout) {
checkWrite();
this.callTimeout = callTimeout;
this.config.callTimeout = callTimeout;
return this;
}
@Override
public long getCallFailoverTimeout() {
return callFailoverTimeout;
return config.callFailoverTimeout;
}
@Override
public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) {
checkWrite();
this.callFailoverTimeout = callFailoverTimeout;
this.config.callFailoverTimeout = callFailoverTimeout;
return this;
}
@Override
public int getMinLargeMessageSize() {
return minLargeMessageSize;
return config.minLargeMessageSize;
}
@Override
public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) {
checkWrite();
this.minLargeMessageSize = minLargeMessageSize;
this.config.minLargeMessageSize = minLargeMessageSize;
return this;
}
@Override
public int getConsumerWindowSize() {
return consumerWindowSize;
return config.consumerWindowSize;
}
@Override
public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) {
checkWrite();
this.consumerWindowSize = consumerWindowSize;
this.config.consumerWindowSize = consumerWindowSize;
return this;
}
@Override
public int getConsumerMaxRate() {
return consumerMaxRate;
return config.consumerMaxRate;
}
@Override
public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) {
checkWrite();
this.consumerMaxRate = consumerMaxRate;
this.config.consumerMaxRate = consumerMaxRate;
return this;
}
@Override
public int getConfirmationWindowSize() {
return confirmationWindowSize;
return config.confirmationWindowSize;
}
@Override
public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) {
checkWrite();
this.confirmationWindowSize = confirmationWindowSize;
this.config.confirmationWindowSize = confirmationWindowSize;
return this;
}
@Override
public int getProducerWindowSize() {
return producerWindowSize;
return config.producerWindowSize;
}
@Override
public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) {
checkWrite();
this.producerWindowSize = producerWindowSize;
this.config.producerWindowSize = producerWindowSize;
return this;
}
@Override
public int getProducerMaxRate() {
return producerMaxRate;
return config.producerMaxRate;
}
@Override
public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) {
checkWrite();
this.producerMaxRate = producerMaxRate;
this.config.producerMaxRate = producerMaxRate;
return this;
}
@Override
public boolean isBlockOnAcknowledge() {
return blockOnAcknowledge;
return config.blockOnAcknowledge;
}
@Override
public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) {
checkWrite();
this.blockOnAcknowledge = blockOnAcknowledge;
this.config.blockOnAcknowledge = blockOnAcknowledge;
return this;
}
@Override
public boolean isBlockOnDurableSend() {
return blockOnDurableSend;
return config.blockOnDurableSend;
}
@Override
public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) {
checkWrite();
this.blockOnDurableSend = blockOnDurableSend;
this.config.blockOnDurableSend = blockOnDurableSend;
return this;
}
@Override
public boolean isBlockOnNonDurableSend() {
return blockOnNonDurableSend;
return config.blockOnNonDurableSend;
}
@Override
public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) {
checkWrite();
this.blockOnNonDurableSend = blockOnNonDurableSend;
this.config.blockOnNonDurableSend = blockOnNonDurableSend;
return this;
}
@Override
public boolean isAutoGroup() {
return autoGroup;
return config.autoGroup;
}
@Override
public ServerLocatorImpl setAutoGroup(final boolean autoGroup) {
checkWrite();
this.autoGroup = autoGroup;
this.config.autoGroup = autoGroup;
return this;
}
@Override
public boolean isPreAcknowledge() {
return preAcknowledge;
return config.preAcknowledge;
}
@Override
public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) {
checkWrite();
this.preAcknowledge = preAcknowledge;
this.config.preAcknowledge = preAcknowledge;
return this;
}
@Override
public int getAckBatchSize() {
return ackBatchSize;
return config.ackBatchSize;
}
@Override
public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) {
checkWrite();
this.ackBatchSize = ackBatchSize;
this.config.ackBatchSize = ackBatchSize;
return this;
}
@Override
public boolean isUseGlobalPools() {
return useGlobalPools;
return config.useGlobalPools;
}
@Override
public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) {
checkWrite();
this.useGlobalPools = useGlobalPools;
this.config.useGlobalPools = useGlobalPools;
return this;
}
@Override
public int getScheduledThreadPoolMaxSize() {
return scheduledThreadPoolMaxSize;
return config.scheduledThreadPoolMaxSize;
}
@Override
public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) {
checkWrite();
this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
this.config.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
return this;
}
@Override
public int getThreadPoolMaxSize() {
return threadPoolMaxSize;
return config.threadPoolMaxSize;
}
@Override
public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) {
checkWrite();
this.threadPoolMaxSize = threadPoolMaxSize;
this.config.threadPoolMaxSize = threadPoolMaxSize;
return this;
}
@Override
public long getRetryInterval() {
return retryInterval;
return config.retryInterval;
}
@Override
public ServerLocatorImpl setRetryInterval(final long retryInterval) {
checkWrite();
this.retryInterval = retryInterval;
this.config.retryInterval = retryInterval;
return this;
}
@Override
public long getMaxRetryInterval() {
return maxRetryInterval;
return config.maxRetryInterval;
}
@Override
public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) {
checkWrite();
maxRetryInterval = retryInterval;
this.config.maxRetryInterval = retryInterval;
return this;
}
@Override
public double getRetryIntervalMultiplier() {
return retryIntervalMultiplier;
return config.retryIntervalMultiplier;
}
@Override
public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) {
checkWrite();
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.config.retryIntervalMultiplier = retryIntervalMultiplier;
return this;
}
@Override
public int getReconnectAttempts() {
return reconnectAttempts;
return config.reconnectAttempts;
}
@Override
public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) {
checkWrite();
this.reconnectAttempts = reconnectAttempts;
this.config.reconnectAttempts = reconnectAttempts;
return this;
}
@Override
public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) {
checkWrite();
this.initialConnectAttempts = initialConnectAttempts;
this.config.initialConnectAttempts = initialConnectAttempts;
return this;
}
@Override
public int getInitialConnectAttempts() {
return initialConnectAttempts;
return config.initialConnectAttempts;
}
@Deprecated
@ -1271,13 +1142,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public String getConnectionLoadBalancingPolicyClassName() {
return connectionLoadBalancingPolicyClassName;
return config.connectionLoadBalancingPolicyClassName;
}
@Override
public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) {
checkWrite();
connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
config.connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
return this;
}
@ -1317,13 +1188,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public int getInitialMessagePacketSize() {
return initialMessagePacketSize;
return config.initialMessagePacketSize;
}
@Override
public ServerLocatorImpl setInitialMessagePacketSize(final int size) {
checkWrite();
initialMessagePacketSize = size;
config.initialMessagePacketSize = size;
return this;
}
@ -1341,12 +1212,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public boolean isCompressLargeMessage() {
return compressLargeMessage;
return config.compressLargeMessage;
}
@Override
public ServerLocatorImpl setCompressLargeMessage(boolean avoid) {
this.compressLargeMessage = avoid;
this.config.compressLargeMessage = avoid;
return this;
}
@ -1692,13 +1563,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override
public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) {
this.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
this.config.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
return this;
}
@Override
public boolean getUseTopologyForLoadBalancing() {
return useTopologyForLoadBalancing;
return config.useTopologyForLoadBalancing;
}
@Override
@ -1820,11 +1691,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts) {
if (config.initialConnectAttempts >= 0 && retryNumber > config.initialConnectAttempts) {
break;
}
if (latch.await(retryInterval, TimeUnit.MILLISECONDS))
if (latch.await(config.retryInterval, TimeUnit.MILLISECONDS))
return null;
}
@ -1859,7 +1730,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
connectors = new ArrayList<>();
if (initialConnectors != null) {
for (TransportConfiguration initialConnector : initialConnectors) {
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, initialConnector, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
factory.disableFinalizeCheck();

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ServerLocatorConfig;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -44,36 +45,7 @@ public class XARecoveryConfig {
private final String password;
private final Map<String, String> properties;
private final ClientProtocolManagerFactory clientProtocolManager;
// ServerLocator properties
private Long callFailoverTimeout;
private Long callTimeout;
private Long clientFailureCheckPeriod;
private Integer confirmationWindowSize;
private String connectionLoadBalancingPolicyClassName;
private Long connectionTTL;
private Integer consumerMaxRate;
private Integer consumerWindowSize;
private Integer initialConnectAttempts;
private Integer producerMaxRate;
private Integer producerWindowSize;
private Integer minLargeMessageSize;
private Long retryInterval;
private Double retryIntervalMultiplier;
private Long maxRetryInterval;
private Integer reconnectAttempts;
private Integer initialMessagePacketSize;
private Integer scheduledThreadPoolMaxSize;
private Integer threadPoolMaxSize;
private boolean autoGroup;
private boolean blockOnAcknowledge;
private boolean blockOnNonDurableSend;
private boolean blockOnDurableSend;
private boolean preAcknowledge;
private boolean useGlobalPools;
private boolean cacheLargeMessagesClient;
private boolean compressLargeMessage;
private boolean failoverOnInitialConnection;
private ServerLocatorConfig locatorConfig;
public static XARecoveryConfig newConfig(ActiveMQConnectionFactory factory,
String userName,
@ -102,7 +74,7 @@ public class XARecoveryConfig {
this.username = username;
this.password = password;
this.ha = ha;
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.properties = properties == null ? new HashMap<>() : properties;
this.clientProtocolManager = clientProtocolManager;
}
@ -164,8 +136,7 @@ public class XARecoveryConfig {
this.ha = serverLocator.isHA();
this.properties = properties == null ? Collections.unmodifiableMap(new HashMap<String, String>()) : Collections.unmodifiableMap(properties);
this.clientProtocolManager = clientProtocolManager;
readLocatorProperties(serverLocator);
this.locatorConfig = serverLocator.getLocatorConfig();
}
public boolean isHA() {
@ -209,80 +180,11 @@ public class XARecoveryConfig {
serverLocator = ActiveMQClient.createServerLocator(isHA(), getTransportConfig()).setProtocolManagerFactory(clientProtocolManager);
}
writeLocatorProperties(serverLocator);
return serverLocator;
if (this.locatorConfig != null) {
serverLocator.setLocatorConfig(new ServerLocatorConfig(this.locatorConfig));
}
private void writeLocatorProperties(ServerLocator serverLocator) {
serverLocator.setAutoGroup(this.autoGroup);
serverLocator.setBlockOnAcknowledge(this.blockOnAcknowledge);
serverLocator.setBlockOnNonDurableSend(this.blockOnNonDurableSend);
serverLocator.setBlockOnDurableSend(this.blockOnDurableSend);
serverLocator.setPreAcknowledge(this.preAcknowledge);
serverLocator.setUseGlobalPools(this.useGlobalPools);
serverLocator.setCacheLargeMessagesClient(this.cacheLargeMessagesClient);
serverLocator.setCompressLargeMessage(this.compressLargeMessage);
serverLocator.setFailoverOnInitialConnection(this.failoverOnInitialConnection);
serverLocator.setConsumerMaxRate(this.consumerMaxRate);
serverLocator.setConsumerWindowSize(this.consumerWindowSize);
serverLocator.setMinLargeMessageSize(this.minLargeMessageSize);
serverLocator.setProducerMaxRate(this.producerMaxRate);
serverLocator.setProducerWindowSize(this.producerWindowSize);
serverLocator.setConfirmationWindowSize(this.confirmationWindowSize);
serverLocator.setReconnectAttempts(this.reconnectAttempts);
serverLocator.setThreadPoolMaxSize(this.threadPoolMaxSize);
serverLocator.setScheduledThreadPoolMaxSize(this.scheduledThreadPoolMaxSize);
serverLocator.setInitialConnectAttempts(this.initialConnectAttempts);
serverLocator.setInitialMessagePacketSize(this.initialMessagePacketSize);
serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
serverLocator.setCallTimeout(this.callTimeout);
serverLocator.setCallFailoverTimeout(this.callFailoverTimeout);
serverLocator.setConnectionTTL(this.connectionTTL);
serverLocator.setRetryInterval(this.retryInterval);
serverLocator.setMaxRetryInterval(this.maxRetryInterval);
serverLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier);
serverLocator.setConnectionLoadBalancingPolicyClassName(this.connectionLoadBalancingPolicyClassName);
}
private void readLocatorProperties(ServerLocator locator) {
this.autoGroup = locator.isAutoGroup();
this.blockOnAcknowledge = locator.isBlockOnAcknowledge();
this.blockOnNonDurableSend = locator.isBlockOnNonDurableSend();
this.blockOnDurableSend = locator.isBlockOnDurableSend();
this.preAcknowledge = locator.isPreAcknowledge();
this.useGlobalPools = locator.isUseGlobalPools();
this.cacheLargeMessagesClient = locator.isCacheLargeMessagesClient();
this.compressLargeMessage = locator.isCompressLargeMessage();
this.failoverOnInitialConnection = locator.isFailoverOnInitialConnection();
this.consumerMaxRate = locator.getConsumerMaxRate();
this.consumerWindowSize = locator.getConsumerWindowSize();
this.minLargeMessageSize = locator.getMinLargeMessageSize();
this.producerMaxRate = locator.getProducerMaxRate();
this.producerWindowSize = locator.getProducerWindowSize();
this.confirmationWindowSize = locator.getConfirmationWindowSize();
this.reconnectAttempts = locator.getReconnectAttempts();
this.threadPoolMaxSize = locator.getThreadPoolMaxSize();
this.scheduledThreadPoolMaxSize = locator.getScheduledThreadPoolMaxSize();
this.initialConnectAttempts = locator.getInitialConnectAttempts();
this.initialMessagePacketSize = locator.getInitialMessagePacketSize();
this.clientFailureCheckPeriod = locator.getClientFailureCheckPeriod();
this.callTimeout = locator.getCallTimeout();
this.callFailoverTimeout = locator.getCallFailoverTimeout();
this.connectionTTL = locator.getConnectionTTL();
this.retryInterval = locator.getRetryInterval();
this.maxRetryInterval = locator.getMaxRetryInterval();
this.retryIntervalMultiplier = locator.getRetryIntervalMultiplier();
this.connectionLoadBalancingPolicyClassName = locator.getConnectionLoadBalancingPolicyClassName();
return serverLocator;
}
@Override