This closes #600

This commit is contained in:
Clebert Suconic 2016-06-27 10:10:11 -04:00
commit 7d69d913e4
13 changed files with 492 additions and 133 deletions

View File

@ -301,6 +301,9 @@ public final class ActiveMQDefaultConfiguration {
// Once the bridge has received this many bytes, it sends a confirmation
private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
// Producer flow control
private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
// Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
@ -840,6 +843,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE;
}
/**
* Producer flow control
*/
public static int getDefaultBridgeProducerWindowSize() {
return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
}
/**
* Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
*/

View File

@ -698,6 +698,7 @@ public interface ActiveMQServerControl {
@Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
@Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
@Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
@Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
@Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
@Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
@Parameter(name = "useDiscoveryGroup", desc = "use discovery group") boolean useDiscoveryGroup,

View File

@ -56,6 +56,9 @@ public final class BridgeConfiguration implements Serializable {
private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
// disable flow control
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
@ -267,6 +270,18 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
public int getProducerWindowSize() {
return producerWindowSize;
}
/**
* @param producerWindowSize the producerWindowSize to set
*/
public BridgeConfiguration setProducerWindowSize(final int producerWindowSize) {
this.producerWindowSize = producerWindowSize;
return this;
}
public long getClientFailureCheckPeriod() {
return clientFailureCheckPeriod;
}
@ -340,6 +355,7 @@ public final class BridgeConfiguration implements Serializable {
result = prime * result + (int) (callTimeout ^ (callTimeout >>> 32));
result = prime * result + (int) (clientFailureCheckPeriod ^ (clientFailureCheckPeriod >>> 32));
result = prime * result + confirmationWindowSize;
result = prime * result + producerWindowSize;
result = prime * result + (int) (connectionTTL ^ (connectionTTL >>> 32));
result = prime * result + ((discoveryGroupName == null) ? 0 : discoveryGroupName.hashCode());
result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
@ -378,6 +394,8 @@ public final class BridgeConfiguration implements Serializable {
return false;
if (confirmationWindowSize != other.confirmationWindowSize)
return false;
if (producerWindowSize != other.producerWindowSize)
return false;
if (connectionTTL != other.connectionTTL)
return false;
if (discoveryGroupName == null) {

View File

@ -62,7 +62,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
private boolean duplicateDetection = ActiveMQDefaultConfiguration.isDefaultClusterDuplicateDetection();
private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration.getDefaultClusterMessageLoadBalancingType());
private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration
.getDefaultClusterMessageLoadBalancingType());
private URISupport.CompositeData compositeMembers;
@ -74,6 +75,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
private boolean allowDirectConnectionsOnly = false;
private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@ -108,15 +111,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
public URISupport.CompositeData getCompositeMembers() {
return compositeMembers;
}
public ClusterConnectionConfiguration setCompositeMembers(URISupport.CompositeData members) {
this.compositeMembers = members;
return this;
}
public URISupport.CompositeData getCompositeMembers() {
return compositeMembers;
}
/**
* @return the clientFailureCheckPeriod
*/
@ -124,6 +127,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return clientFailureCheckPeriod;
}
/**
* @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
*/
public ClusterConnectionConfiguration setClientFailureCheckPeriod(long clientFailureCheckPeriod) {
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
return this;
}
/**
* @return the connectionTTL
*/
@ -131,6 +142,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return connectionTTL;
}
/**
* @param connectionTTL the connectionTTL to set
*/
public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
this.connectionTTL = connectionTTL;
return this;
}
/**
* @return the retryIntervalMultiplier
*/
@ -138,6 +157,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return retryIntervalMultiplier;
}
/**
* @param retryIntervalMultiplier the retryIntervalMultiplier to set
*/
public ClusterConnectionConfiguration setRetryIntervalMultiplier(double retryIntervalMultiplier) {
this.retryIntervalMultiplier = retryIntervalMultiplier;
return this;
}
/**
* @return the maxRetryInterval
*/
@ -145,6 +172,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return maxRetryInterval;
}
/**
* @param maxRetryInterval the maxRetryInterval to set
*/
public ClusterConnectionConfiguration setMaxRetryInterval(long maxRetryInterval) {
this.maxRetryInterval = maxRetryInterval;
return this;
}
/**
* @return the initialConnectAttempts
*/
@ -152,6 +187,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return initialConnectAttempts;
}
/**
* @param initialConnectAttempts the reconnectAttempts to set
*/
public ClusterConnectionConfiguration setInitialConnectAttempts(int initialConnectAttempts) {
this.initialConnectAttempts = initialConnectAttempts;
return this;
}
/**
* @return the reconnectAttempts
*/
@ -159,14 +202,38 @@ public final class ClusterConnectionConfiguration implements Serializable {
return reconnectAttempts;
}
/**
* @param reconnectAttempts the reconnectAttempts to set
*/
public ClusterConnectionConfiguration setReconnectAttempts(int reconnectAttempts) {
this.reconnectAttempts = reconnectAttempts;
return this;
}
public long getCallTimeout() {
return callTimeout;
}
/**
* @param callTimeout the callTimeout to set
*/
public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
this.callTimeout = callTimeout;
return this;
}
public long getCallFailoverTimeout() {
return callFailoverTimeout;
}
/**
* @param callFailoverTimeout the callTimeout to set
*/
public ClusterConnectionConfiguration setCallFailoverTimeout(long callFailoverTimeout) {
this.callFailoverTimeout = callFailoverTimeout;
return this;
}
public String getConnectorName() {
return connectorName;
}
@ -180,10 +247,27 @@ public final class ClusterConnectionConfiguration implements Serializable {
return duplicateDetection;
}
/**
* @param duplicateDetection the duplicateDetection to set
*/
public ClusterConnectionConfiguration setDuplicateDetection(boolean duplicateDetection) {
this.duplicateDetection = duplicateDetection;
return this;
}
public MessageLoadBalancingType getMessageLoadBalancingType() {
return messageLoadBalancingType;
}
/**
* @param messageLoadBalancingType
* @return
*/
public ClusterConnectionConfiguration setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.messageLoadBalancingType = messageLoadBalancingType;
return this;
}
public int getMaxHops() {
return maxHops;
}
@ -202,6 +286,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
public int getProducerWindowSize() {
return producerWindowSize;
}
public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize) {
this.producerWindowSize = producerWindowSize;
return this;
}
public List<String> getStaticConnectors() {
return staticConnectors;
}
@ -224,6 +317,14 @@ public final class ClusterConnectionConfiguration implements Serializable {
return retryInterval;
}
/**
* @param retryInterval the retryInterval to set
*/
public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
return this;
}
public boolean isAllowDirectConnectionsOnly() {
return allowDirectConnectionsOnly;
}
@ -248,95 +349,6 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
/**
* @param clientFailureCheckPeriod the clientFailureCheckPeriod to set
*/
public ClusterConnectionConfiguration setClientFailureCheckPeriod(long clientFailureCheckPeriod) {
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
return this;
}
/**
* @param connectionTTL the connectionTTL to set
*/
public ClusterConnectionConfiguration setConnectionTTL(long connectionTTL) {
this.connectionTTL = connectionTTL;
return this;
}
/**
* @param retryInterval the retryInterval to set
*/
public ClusterConnectionConfiguration setRetryInterval(long retryInterval) {
this.retryInterval = retryInterval;
return this;
}
/**
* @param retryIntervalMultiplier the retryIntervalMultiplier to set
*/
public ClusterConnectionConfiguration setRetryIntervalMultiplier(double retryIntervalMultiplier) {
this.retryIntervalMultiplier = retryIntervalMultiplier;
return this;
}
/**
* @param maxRetryInterval the maxRetryInterval to set
*/
public ClusterConnectionConfiguration setMaxRetryInterval(long maxRetryInterval) {
this.maxRetryInterval = maxRetryInterval;
return this;
}
/**
* @param initialConnectAttempts the reconnectAttempts to set
*/
public ClusterConnectionConfiguration setInitialConnectAttempts(int initialConnectAttempts) {
this.initialConnectAttempts = initialConnectAttempts;
return this;
}
/**
* @param reconnectAttempts the reconnectAttempts to set
*/
public ClusterConnectionConfiguration setReconnectAttempts(int reconnectAttempts) {
this.reconnectAttempts = reconnectAttempts;
return this;
}
/**
* @param callTimeout the callTimeout to set
*/
public ClusterConnectionConfiguration setCallTimeout(long callTimeout) {
this.callTimeout = callTimeout;
return this;
}
/**
* @param callFailoverTimeout the callTimeout to set
*/
public ClusterConnectionConfiguration setCallFailoverTimeout(long callFailoverTimeout) {
this.callFailoverTimeout = callFailoverTimeout;
return this;
}
/**
* @param duplicateDetection the duplicateDetection to set
*/
public ClusterConnectionConfiguration setDuplicateDetection(boolean duplicateDetection) {
this.duplicateDetection = duplicateDetection;
return this;
}
/**
* @param messageLoadBalancingType
* @return
*/
public ClusterConnectionConfiguration setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.messageLoadBalancingType = messageLoadBalancingType;
return this;
}
/*
* returns the cluster update interval
* */
@ -457,77 +469,107 @@ public final class ClusterConnectionConfiguration implements Serializable {
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
ClusterConnectionConfiguration other = (ClusterConnectionConfiguration) obj;
if (address == null) {
if (other.address != null)
if (other.address != null) {
return false;
}
else if (!address.equals(other.address))
}
else if (!address.equals(other.address)) {
return false;
if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly)
}
if (allowDirectConnectionsOnly != other.allowDirectConnectionsOnly) {
return false;
if (callFailoverTimeout != other.callFailoverTimeout)
}
if (callFailoverTimeout != other.callFailoverTimeout) {
return false;
if (callTimeout != other.callTimeout)
}
if (callTimeout != other.callTimeout) {
return false;
if (clientFailureCheckPeriod != other.clientFailureCheckPeriod)
}
if (clientFailureCheckPeriod != other.clientFailureCheckPeriod) {
return false;
if (clusterNotificationAttempts != other.clusterNotificationAttempts)
}
if (clusterNotificationAttempts != other.clusterNotificationAttempts) {
return false;
if (clusterNotificationInterval != other.clusterNotificationInterval)
}
if (clusterNotificationInterval != other.clusterNotificationInterval) {
return false;
if (confirmationWindowSize != other.confirmationWindowSize)
}
if (confirmationWindowSize != other.confirmationWindowSize) {
return false;
if (connectionTTL != other.connectionTTL)
}
if (connectionTTL != other.connectionTTL) {
return false;
}
if (connectorName == null) {
if (other.connectorName != null)
if (other.connectorName != null) {
return false;
}
else if (!connectorName.equals(other.connectorName))
}
else if (!connectorName.equals(other.connectorName)) {
return false;
}
if (discoveryGroupName == null) {
if (other.discoveryGroupName != null)
if (other.discoveryGroupName != null) {
return false;
}
else if (!discoveryGroupName.equals(other.discoveryGroupName))
}
else if (!discoveryGroupName.equals(other.discoveryGroupName)) {
return false;
if (duplicateDetection != other.duplicateDetection)
}
if (duplicateDetection != other.duplicateDetection) {
return false;
if (messageLoadBalancingType != other.messageLoadBalancingType)
}
if (messageLoadBalancingType != other.messageLoadBalancingType) {
return false;
if (maxHops != other.maxHops)
}
if (maxHops != other.maxHops) {
return false;
if (maxRetryInterval != other.maxRetryInterval)
}
if (maxRetryInterval != other.maxRetryInterval) {
return false;
if (minLargeMessageSize != other.minLargeMessageSize)
}
if (minLargeMessageSize != other.minLargeMessageSize) {
return false;
}
if (name == null) {
if (other.name != null)
if (other.name != null) {
return false;
}
else if (!name.equals(other.name))
}
else if (!name.equals(other.name)) {
return false;
if (initialConnectAttempts != other.initialConnectAttempts)
}
if (initialConnectAttempts != other.initialConnectAttempts) {
return false;
if (reconnectAttempts != other.reconnectAttempts)
}
if (reconnectAttempts != other.reconnectAttempts) {
return false;
if (retryInterval != other.retryInterval)
}
if (retryInterval != other.retryInterval) {
return false;
if (Double.doubleToLongBits(retryIntervalMultiplier) != Double.doubleToLongBits(other.retryIntervalMultiplier))
}
if (Double.doubleToLongBits(retryIntervalMultiplier) != Double.doubleToLongBits(other.retryIntervalMultiplier)) {
return false;
}
if (staticConnectors == null) {
if (other.staticConnectors != null)
if (other.staticConnectors != null) {
return false;
}
else if (!staticConnectors.equals(other.staticConnectors))
}
else if (!staticConnectors.equals(other.staticConnectors)) {
return false;
}
return true;
}

View File

@ -1314,6 +1314,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
int confirmationWindowSize = getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), Validators.GT_ZERO);
int producerWindowSize = getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), Validators.MINUS_ONE_OR_GT_ZERO);
long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO);
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
@ -1343,7 +1345,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
ClusterConnectionConfiguration config = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setDuplicateDetection(duplicateDetection).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(confirmationWindowSize).setAllowDirectConnectionsOnly(allowDirectConnectionsOnly).setClusterNotificationInterval(clusterNotificationInterval).setClusterNotificationAttempts(clusterNotificationAttempts);
ClusterConnectionConfiguration config = new ClusterConnectionConfiguration()
.setName(name)
.setAddress(address)
.setConnectorName(connectorName)
.setMinLargeMessageSize(minLargeMessageSize)
.setClientFailureCheckPeriod(clientFailureCheckPeriod)
.setConnectionTTL(connectionTTL)
.setRetryInterval(retryInterval)
.setRetryIntervalMultiplier(retryIntervalMultiplier)
.setMaxRetryInterval(maxRetryInterval)
.setInitialConnectAttempts(initialConnectAttempts)
.setReconnectAttempts(reconnectAttempts)
.setCallTimeout(callTimeout)
.setCallFailoverTimeout(callFailoverTimeout)
.setDuplicateDetection(duplicateDetection)
.setMessageLoadBalancingType(messageLoadBalancingType)
.setMaxHops(maxHops)
.setConfirmationWindowSize(confirmationWindowSize)
.setProducerindowSize(producerWindowSize)
.setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
.setClusterNotificationInterval(clusterNotificationInterval)
.setClusterNotificationAttempts(clusterNotificationAttempts);
if (discoveryGroupName == null) {
config.setStaticConnectors(staticConnectorNames);
@ -1377,6 +1400,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
// Default bridge conf
int confirmationWindowSize = getInteger(brNode, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), Validators.GT_ZERO);
int producerWindowSize = getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), Validators.GT_ZERO);
long retryInterval = getLong(brNode, "retry-interval", ActiveMQClient.DEFAULT_RETRY_INTERVAL, Validators.GT_ZERO);
long clientFailureCheckPeriod = getLong(brNode, "check-period", ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, Validators.GT_ZERO);
@ -1444,7 +1469,27 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
BridgeConfiguration config = new BridgeConfiguration()
.setName(name)
.setQueueName(queueName)
.setForwardingAddress(forwardingAddress)
.setFilterString(filterString)
.setTransformerClassName(transformerClassName)
.setMinLargeMessageSize(minLargeMessageSize)
.setClientFailureCheckPeriod(clientFailureCheckPeriod)
.setConnectionTTL(connectionTTL)
.setRetryInterval(retryInterval)
.setMaxRetryInterval(maxRetryInterval)
.setRetryIntervalMultiplier(retryIntervalMultiplier)
.setInitialConnectAttempts(initialConnectAttempts)
.setReconnectAttempts(reconnectAttempts)
.setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
.setUseDuplicateDetection(useDuplicateDetection)
.setConfirmationWindowSize(confirmationWindowSize)
.setProducerWindowSize(producerWindowSize)
.setHA(ha)
.setUser(user)
.setPassword(password);
if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);

View File

@ -1722,6 +1722,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final int producerWindowSize,
final long clientFailureCheckPeriod,
final String staticConnectorsOrDiscoveryGroup,
boolean useDiscoveryGroup,
@ -1733,7 +1734,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
BridgeConfiguration config = new BridgeConfiguration()
.setName(name)
.setQueueName(queueName)
.setForwardingAddress(forwardingAddress)
.setFilterString(filterString)
.setTransformerClassName(transformerClassName)
.setClientFailureCheckPeriod(clientFailureCheckPeriod)
.setRetryInterval(retryInterval)
.setRetryIntervalMultiplier(retryIntervalMultiplier)
.setInitialConnectAttempts(initialConnectAttempts)
.setReconnectAttempts(reconnectAttempts)
.setUseDuplicateDetection(useDuplicateDetection)
.setConfirmationWindowSize(confirmationWindowSize)
.setProducerWindowSize(producerWindowSize)
.setHA(ha)
.setUser(user)
.setPassword(password);
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);

View File

@ -465,8 +465,7 @@ public final class ClusterManager implements ActiveMQComponent {
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
//disable flow control
serverLocator.setProducerWindowSize(-1);
serverLocator.setProducerWindowSize(config.getProducerWindowSize());
// This will be set to 30s unless it's changed from embedded / testing
// there is no reason to exception the config for this timeout
@ -615,7 +614,7 @@ public final class ClusterManager implements ActiveMQComponent {
dg);
}
clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
}
@ -626,7 +625,7 @@ public final class ClusterManager implements ActiveMQComponent {
logger.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
}
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
}

View File

@ -119,6 +119,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
private final int confirmationWindowSize;
private final int producerWindowSize;
/**
* Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap},
* however we need the guard to synchronize multiple step operations during topology updates.
@ -184,6 +186,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final boolean useDuplicateDetection,
final MessageLoadBalancingType messageLoadBalancingType,
final int confirmationWindowSize,
final int producerWindowSize,
final ExecutorFactory executorFactory,
final ActiveMQServer server,
final PostOffice postOffice,
@ -224,6 +227,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
this.confirmationWindowSize = confirmationWindowSize;
this.producerWindowSize = producerWindowSize;
this.executorFactory = executorFactory;
this.clusterNotificationInterval = clusterNotificationInterval;
@ -290,6 +295,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final boolean useDuplicateDetection,
final MessageLoadBalancingType messageLoadBalancingType,
final int confirmationWindowSize,
final int producerWindowSize,
final ExecutorFactory executorFactory,
final ActiveMQServer server,
final PostOffice postOffice,
@ -336,6 +342,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
this.confirmationWindowSize = confirmationWindowSize;
this.producerWindowSize = producerWindowSize;
this.executorFactory = executorFactory;
this.clusterNotificationInterval = clusterNotificationInterval;
@ -601,8 +609,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
serverLocator.setCallTimeout(callTimeout);
serverLocator.setCallFailoverTimeout(callFailoverTimeout);
// No producer flow control on the bridges, as we don't want to lock the queues
serverLocator.setProducerWindowSize(-1);
serverLocator.setProducerWindowSize(producerWindowSize);
if (retryInterval > 0) {
this.serverLocator.setRetryInterval(retryInterval);

View File

@ -1116,6 +1116,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="-1">
<xsd:annotation>
<xsd:documentation>
Producer flow control
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -1341,6 +1349,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="-1">
<xsd:annotation>
<xsd:documentation>
Producer flow control
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="call-failover-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -215,6 +215,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(true, bc.isUseDuplicateDetection());
Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
Assert.assertEquals(null, bc.getDiscoveryGroupName());
Assert.assertEquals(444, bc.getProducerWindowSize());
}
else {
Assert.assertEquals("bridge2", bc.getName());
@ -224,6 +225,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(null, bc.getTransformerClassName());
Assert.assertEquals(null, bc.getStaticConnectors());
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
Assert.assertEquals(555, bc.getProducerWindowSize());
}
}
@ -256,6 +258,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
Assert.assertEquals(null, ccc.getDiscoveryGroupName());
Assert.assertEquals(222, ccc.getProducerWindowSize());
}
else {
Assert.assertEquals("cluster-connection2", ccc.getName());
@ -268,6 +271,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(2, ccc.getMaxHops());
Assert.assertEquals(Collections.emptyList(), ccc.getStaticConnectors());
Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
Assert.assertEquals(333, ccc.getProducerWindowSize());
}
}

View File

@ -142,6 +142,7 @@
<reconnect-attempts>2</reconnect-attempts>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<use-duplicate-detection>true</use-duplicate-detection>
<producer-window-size>444</producer-window-size>
<static-connectors>
<connector-ref>connector1</connector-ref>
</static-connectors>
@ -149,6 +150,7 @@
<bridge name="bridge2">
<queue-name>queue2</queue-name>
<forwarding-address>bridge-forwarding-address2</forwarding-address>
<producer-window-size>555</producer-window-size>
<discovery-group-ref discovery-group-name="dg1"/>
</bridge>
</bridges>
@ -180,6 +182,7 @@
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<producer-window-size>222</producer-window-size>
<call-failover-timeout>123</call-failover-timeout>
<static-connectors>
<connector-ref>connector1</connector-ref>
@ -194,6 +197,7 @@
<use-duplicate-detection>false</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>2</max-hops>
<producer-window-size>333</producer-window-size>
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection>

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1620,6 +1626,195 @@ public class BridgeTest extends ActiveMQTestBase {
assertEquals(0, loadQueues(server0).size());
}
@Test
public void testBridgeWithVeryLargeMessage() throws Exception {
ActiveMQServer server0 = null;
ActiveMQServer server1 = null;
final int PAGE_MAX = 1024 * 1024;
final int PAGE_SIZE = 10 * 1024;
ServerLocator locator = null;
try {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
int minLargeMessageSize = 1024 * 1024;
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
.setName("bridge1")
.setQueueName(queueName0)
.setForwardingAddress(forwardAddress)
.setRetryInterval(1000)
.setReconnectAttemptsOnSameNode(-1)
.setUseDuplicateDetection(false)
.setConfirmationWindowSize(1024)
.setStaticConnectors(staticConnectors)
.setMinLargeMessageSize(minLargeMessageSize)
.setProducerWindowSize(minLargeMessageSize / 2);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
.setAddress(forwardAddress)
.setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
//create a large message bigger than Integer.MAX_VALUE
final long largeMessageSize = Integer.MAX_VALUE + 1000L;
ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize);
producer0.send(largeMessage);
session0.commit();
//check target queue for large message arriving
ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1));
long messageCount = query.getMessageCount();
int count = 0;
//wait for 300 sec max
while (messageCount == 0 && count < 300) {
count++;
Thread.sleep(1000);
query = session1.queueQuery(new SimpleString(queueName1));
messageCount = query.getMessageCount();
}
if (messageCount == 0) {
fail("large message didn't arrived after 5 min!");
}
//receive the message
ClientMessage message = consumer1.receive(5000);
message.acknowledge();
File outputFile = new File(getTemporaryDir(), "huge_message_received.dat");
System.out.println("-----message save to: " + outputFile.getAbsolutePath());
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
message.setOutputStream(bufferedOutput);
if (!message.waitOutputStreamCompletion(5 * 60 * 1000)) {
fail("message didn't get received to disk in 5 min. Is the machine slow?");
}
session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
}
finally {
if (locator != null) {
locator.close();
}
try {
server0.stop();
}
catch (Throwable ignored) {
}
try {
server1.stop();
}
catch (Throwable ignored) {
}
}
assertEquals(0, loadQueues(server0).size());
}
private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize) throws Exception {
File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
createFile(fileInput, largeMessageSize);
System.out.println("File created at: " + fileInput.getAbsolutePath());
ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true);
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setBodyInputStream(bufferedInput);
return message;
}
private static void createFile(final File file, final long fileSize) throws IOException {
if (file.exists()) {
System.out.println("---file already there " + file.length());
return;
}
FileOutputStream fileOut = new FileOutputStream(file);
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
byte[] outBuffer = new byte[1024 * 1024];
System.out.println(" --- creating file, size: " + fileSize);
for (long i = 0; i < fileSize; i += outBuffer.length) {
buffOut.write(outBuffer);
}
buffOut.close();
}
@Test
public void testNullForwardingAddress() throws Exception {
Map<String, Object> server0Params = new HashMap<>();

View File

@ -667,6 +667,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
null, // filterString
ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.INITIAL_CONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, false, // duplicateDetection
1, // confirmationWindowSize
-1, // producerWindowSize
ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, connectorConfig.getName(), // liveConnector
false, false, null, null);