diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java index 4049fb4ae8..cea76338db 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java @@ -309,6 +309,10 @@ public final class ActiveMQDefaultConfiguration // Once the bridge has received this many bytes, it sends a confirmation private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576; + // Producer flow control is disabled by default on the bridge + // You probably need to enable this if you use lots of huge messages + private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1; + // Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10; @@ -870,6 +874,16 @@ public final class ActiveMQDefaultConfiguration return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE; } + + /** + * This default is used for both bridge and cluster connections (since they both translate to bridges) * + * @return + */ + public static int getDefaultBridgeProducerWindowSize() + { + return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE; + } + /** * Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors */ diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java index 7623cdb1a8..3ff5faee8f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java @@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal try { - for (int pos = 0; pos < bodySize; ) + for (long pos = 0; pos < bodySize; ) { final boolean lastChunk; - final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); + final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); @@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); try { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java index 8df00114cb..7097bd2aa0 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV public class ActiveMQSessionContext extends SessionContext { + private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; @@ -425,6 +426,27 @@ public class ActiveMQSessionContext extends SessionContext @Override public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException + { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = + new SessionSendContinuationMessage(msgI, chunk, !lastChunk, + requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) + { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + + @Override + public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; final SessionSendContinuationMessage chunkPacket = diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java index af0d4f3bae..c642837b21 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java @@ -149,6 +149,8 @@ public abstract class SessionContext public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java index 87fae42afa..65cb2a4866 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java @@ -56,6 +56,8 @@ public final class BridgeConfiguration implements Serializable private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(); + private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser(); @@ -302,6 +304,19 @@ public final class BridgeConfiguration implements Serializable return this; } + /** The producer flow control on the birdge */ + public BridgeConfiguration setProducerWindowSize(final int producerWindowSize) + { + this.producerWindowSize = producerWindowSize; + return this; + } + + public int getProducerWindowSize() + { + return producerWindowSize; + + } + public long getClientFailureCheckPeriod() { return clientFailureCheckPeriod; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java index 3a6c3152f5..ea95dd8f86 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java @@ -63,6 +63,8 @@ public final class ClusterConnectionConfiguration implements Serializable private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(); + private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(); + private boolean allowDirectConnectionsOnly = false; private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; @@ -198,6 +200,18 @@ public final class ClusterConnectionConfiguration implements Serializable return this; } + + public int getProducerWindowSize() + { + return producerWindowSize; + } + + public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize) + { + this.producerWindowSize = producerWindowSize; + return this; + } + public List getStaticConnectors() { return staticConnectors; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java index 775e60cbcf..3f7c637531 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java @@ -1417,6 +1417,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), Validators.GT_ZERO); + int producerWindowSize = + getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), + Validators.MINUS_ONE_OR_GT_ZERO); + long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO); int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO); @@ -1468,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil .setForwardWhenNoConsumers(forwardWhenNoConsumers) .setMaxHops(maxHops) .setConfirmationWindowSize(confirmationWindowSize) + .setProducerindowSize(producerWindowSize) .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly) .setClusterNotificationInterval(clusterNotificationInterval) .setClusterNotificationAttempts(clusterNotificationAttempts); @@ -1549,6 +1554,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil getInteger(brNode, "reconnect-attempts-same-node", ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(), Validators.MINUS_ONE_OR_GE_ZERO); + int producerWindowSize = + getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), + Validators.MINUS_ONE_OR_GE_ZERO); + boolean useDuplicateDetection = getBoolean(brNode, "use-duplicate-detection", ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection()); @@ -1630,7 +1639,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil .setConfirmationWindowSize(confirmationWindowSize) .setHA(ha) .setUser(user) - .setPassword(password); + .setPassword(password) + .setProducerWindowSize(producerWindowSize); if (!staticConnectorNames.isEmpty()) { diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java index 24ed7c0127..a289e42f94 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java @@ -529,8 +529,7 @@ public final class ClusterManager implements ActiveMQComponent serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection()); serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection()); serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize()); - //disable flow control - serverLocator.setProducerWindowSize(-1); + serverLocator.setProducerWindowSize(config.getProducerWindowSize()); // This will be set to 30s unless it's changed from embedded / testing // there is no reason to exception the config for this timeout @@ -735,6 +734,7 @@ public final class ClusterManager implements ActiveMQComponent config.isDuplicateDetection(), config.isForwardWhenNoConsumers(), config.getConfirmationWindowSize(), + config.getProducerWindowSize(), executorFactory, server, postOffice, @@ -777,6 +777,7 @@ public final class ClusterManager implements ActiveMQComponent config.isDuplicateDetection(), config.isForwardWhenNoConsumers(), config.getConfirmationWindowSize(), + config.getProducerWindowSize(), executorFactory, server, postOffice, diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java index 9ff99485f2..c02ed572e4 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java @@ -113,6 +113,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn private final int confirmationWindowSize; + private final int producerWindowSize; + /** * Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap}, * however we need the guard to synchronize multiple step operations during topology updates. @@ -179,6 +181,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn final boolean useDuplicateDetection, final boolean routeWhenNoConsumers, final int confirmationWindowSize, + final int producerWindowSize, final ExecutorFactory executorFactory, final ActiveMQServer server, final PostOffice postOffice, @@ -220,6 +223,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.confirmationWindowSize = confirmationWindowSize; + this.producerWindowSize = producerWindowSize; + this.executorFactory = executorFactory; this.clusterNotificationInterval = clusterNotificationInterval; @@ -286,6 +291,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn final boolean useDuplicateDetection, final boolean routeWhenNoConsumers, final int confirmationWindowSize, + final int producerWindowSize, final ExecutorFactory executorFactory, final ActiveMQServer server, final PostOffice postOffice, @@ -333,6 +339,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.confirmationWindowSize = confirmationWindowSize; + this.producerWindowSize = producerWindowSize; + this.executorFactory = executorFactory; this.clusterNotificationInterval = clusterNotificationInterval; @@ -637,8 +645,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection); serverLocator.setCallTimeout(callTimeout); serverLocator.setCallFailoverTimeout(callFailoverTimeout); - // No producer flow control on the bridges, as we don't want to lock the queues - serverLocator.setProducerWindowSize(-1); + serverLocator.setProducerWindowSize(producerWindowSize); if (retryInterval > 0) { diff --git a/activemq-server/src/main/resources/schema/activemq-configuration.xsd b/activemq-server/src/main/resources/schema/activemq-configuration.xsd index 10ab978d70..31b840cb60 100644 --- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd +++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd @@ -1,4 +1,4 @@ - +