This closes #211 large message over bridge

This commit is contained in:
jbertram 2015-04-23 15:02:19 -05:00
commit f1faacb112
17 changed files with 363 additions and 28 deletions

View File

@ -309,6 +309,10 @@ public final class ActiveMQDefaultConfiguration
// Once the bridge has received this many bytes, it sends a confirmation // Once the bridge has received this many bytes, it sends a confirmation
private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576; private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
// Producer flow control is disabled by default on the bridge
// You probably need to enable this if you use lots of huge messages
private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
// Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors // Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10; private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
@ -870,6 +874,16 @@ public final class ActiveMQDefaultConfiguration
return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE; return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE;
} }
/**
* This default is used for both bridge and cluster connections (since they both translate to bridges) *
* @return
*/
public static int getDefaultBridgeProducerWindowSize()
{
return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
}
/** /**
* Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors * Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
*/ */

View File

@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal
try try
{ {
for (int pos = 0; pos < bodySize; ) for (long pos = 0; pos < bodySize; )
{ {
final boolean lastChunk; final boolean lastChunk;
final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize);
final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal
lastChunk = pos >= bodySize; lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
try try
{ {

View File

@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV
public class ActiveMQSessionContext extends SessionContext public class ActiveMQSessionContext extends SessionContext
{ {
private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec
private final Channel sessionChannel; private final Channel sessionChannel;
private final int serverVersion; private final int serverVersion;
private int confirmationWindow; private int confirmationWindow;
@ -425,6 +426,27 @@ public class ActiveMQSessionContext extends SessionContext
@Override @Override
public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException
{
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket =
new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
requiresResponse, messageBodySize, messageHandler);
if (requiresResponse)
{
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
}
else
{
sessionChannel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
}
@Override
public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException
{ {
final boolean requiresResponse = lastChunk && sendBlocking; final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket = final SessionSendContinuationMessage chunkPacket =

View File

@ -149,6 +149,8 @@ public abstract class SessionContext
public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);

View File

@ -56,6 +56,8 @@ public final class BridgeConfiguration implements Serializable
private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser(); private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
@ -302,6 +304,19 @@ public final class BridgeConfiguration implements Serializable
return this; return this;
} }
/** The producer flow control on the birdge */
public BridgeConfiguration setProducerWindowSize(final int producerWindowSize)
{
this.producerWindowSize = producerWindowSize;
return this;
}
public int getProducerWindowSize()
{
return producerWindowSize;
}
public long getClientFailureCheckPeriod() public long getClientFailureCheckPeriod()
{ {
return clientFailureCheckPeriod; return clientFailureCheckPeriod;

View File

@ -63,6 +63,8 @@ public final class ClusterConnectionConfiguration implements Serializable
private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(); private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
private boolean allowDirectConnectionsOnly = false; private boolean allowDirectConnectionsOnly = false;
private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
@ -198,6 +200,18 @@ public final class ClusterConnectionConfiguration implements Serializable
return this; return this;
} }
public int getProducerWindowSize()
{
return producerWindowSize;
}
public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize)
{
this.producerWindowSize = producerWindowSize;
return this;
}
public List<String> getStaticConnectors() public List<String> getStaticConnectors()
{ {
return staticConnectors; return staticConnectors;

View File

@ -1417,6 +1417,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(),
Validators.GT_ZERO); Validators.GT_ZERO);
int producerWindowSize =
getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(),
Validators.MINUS_ONE_OR_GT_ZERO);
long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO); long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO);
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO); int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
@ -1468,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
.setForwardWhenNoConsumers(forwardWhenNoConsumers) .setForwardWhenNoConsumers(forwardWhenNoConsumers)
.setMaxHops(maxHops) .setMaxHops(maxHops)
.setConfirmationWindowSize(confirmationWindowSize) .setConfirmationWindowSize(confirmationWindowSize)
.setProducerindowSize(producerWindowSize)
.setAllowDirectConnectionsOnly(allowDirectConnectionsOnly) .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
.setClusterNotificationInterval(clusterNotificationInterval) .setClusterNotificationInterval(clusterNotificationInterval)
.setClusterNotificationAttempts(clusterNotificationAttempts); .setClusterNotificationAttempts(clusterNotificationAttempts);
@ -1549,6 +1554,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
getInteger(brNode, "reconnect-attempts-same-node", ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(), getInteger(brNode, "reconnect-attempts-same-node", ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(),
Validators.MINUS_ONE_OR_GE_ZERO); Validators.MINUS_ONE_OR_GE_ZERO);
int producerWindowSize =
getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(),
Validators.MINUS_ONE_OR_GE_ZERO);
boolean useDuplicateDetection = getBoolean(brNode, boolean useDuplicateDetection = getBoolean(brNode,
"use-duplicate-detection", "use-duplicate-detection",
ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection()); ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection());
@ -1630,7 +1639,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
.setConfirmationWindowSize(confirmationWindowSize) .setConfirmationWindowSize(confirmationWindowSize)
.setHA(ha) .setHA(ha)
.setUser(user) .setUser(user)
.setPassword(password); .setPassword(password)
.setProducerWindowSize(producerWindowSize);
if (!staticConnectorNames.isEmpty()) if (!staticConnectorNames.isEmpty())
{ {

View File

@ -529,8 +529,7 @@ public final class ClusterManager implements ActiveMQComponent
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection()); serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection()); serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize()); serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
//disable flow control serverLocator.setProducerWindowSize(config.getProducerWindowSize());
serverLocator.setProducerWindowSize(-1);
// This will be set to 30s unless it's changed from embedded / testing // This will be set to 30s unless it's changed from embedded / testing
// there is no reason to exception the config for this timeout // there is no reason to exception the config for this timeout
@ -735,6 +734,7 @@ public final class ClusterManager implements ActiveMQComponent
config.isDuplicateDetection(), config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(), config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(), config.getConfirmationWindowSize(),
config.getProducerWindowSize(),
executorFactory, executorFactory,
server, server,
postOffice, postOffice,
@ -777,6 +777,7 @@ public final class ClusterManager implements ActiveMQComponent
config.isDuplicateDetection(), config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(), config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(), config.getConfirmationWindowSize(),
config.getProducerWindowSize(),
executorFactory, executorFactory,
server, server,
postOffice, postOffice,

View File

@ -113,6 +113,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
private final int confirmationWindowSize; private final int confirmationWindowSize;
private final int producerWindowSize;
/** /**
* Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap}, * Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap},
* however we need the guard to synchronize multiple step operations during topology updates. * however we need the guard to synchronize multiple step operations during topology updates.
@ -179,6 +181,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final boolean useDuplicateDetection, final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers, final boolean routeWhenNoConsumers,
final int confirmationWindowSize, final int confirmationWindowSize,
final int producerWindowSize,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final ActiveMQServer server, final ActiveMQServer server,
final PostOffice postOffice, final PostOffice postOffice,
@ -220,6 +223,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
this.confirmationWindowSize = confirmationWindowSize; this.confirmationWindowSize = confirmationWindowSize;
this.producerWindowSize = producerWindowSize;
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.clusterNotificationInterval = clusterNotificationInterval; this.clusterNotificationInterval = clusterNotificationInterval;
@ -286,6 +291,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
final boolean useDuplicateDetection, final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers, final boolean routeWhenNoConsumers,
final int confirmationWindowSize, final int confirmationWindowSize,
final int producerWindowSize,
final ExecutorFactory executorFactory, final ExecutorFactory executorFactory,
final ActiveMQServer server, final ActiveMQServer server,
final PostOffice postOffice, final PostOffice postOffice,
@ -333,6 +339,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
this.confirmationWindowSize = confirmationWindowSize; this.confirmationWindowSize = confirmationWindowSize;
this.producerWindowSize = producerWindowSize;
this.executorFactory = executorFactory; this.executorFactory = executorFactory;
this.clusterNotificationInterval = clusterNotificationInterval; this.clusterNotificationInterval = clusterNotificationInterval;
@ -637,8 +645,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection); serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
serverLocator.setCallTimeout(callTimeout); serverLocator.setCallTimeout(callTimeout);
serverLocator.setCallFailoverTimeout(callFailoverTimeout); serverLocator.setCallFailoverTimeout(callFailoverTimeout);
// No producer flow control on the bridges, as we don't want to lock the queues serverLocator.setProducerWindowSize(producerWindowSize);
serverLocator.setProducerWindowSize(-1);
if (retryInterval > 0) if (retryInterval > 0)
{ {

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
@ -287,7 +287,7 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="remoting-incoming-interceptors" type="class-name-sequenceType" maxOccurs="1" minOccurs="0"> <xsd:element name="remoting-incoming-interceptors" type="class-name-sequenceType" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -1049,6 +1049,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="1048576">
<xsd:annotation>
<xsd:documentation>
Producer flow control on the bridge (default disabled = -1)
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0"> <xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>
@ -1231,6 +1239,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="1048576">
<xsd:annotation>
<xsd:documentation>
Producer flow control on the bridge (default disabled = -1)
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="call-failover-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:element name="call-failover-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>

View File

@ -223,6 +223,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(true, bc.isUseDuplicateDetection()); Assert.assertEquals(true, bc.isUseDuplicateDetection());
Assert.assertEquals("connector1", bc.getStaticConnectors().get(0)); Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
Assert.assertEquals(null, bc.getDiscoveryGroupName()); Assert.assertEquals(null, bc.getDiscoveryGroupName());
Assert.assertEquals(444, bc.getProducerWindowSize());
} }
else else
{ {
@ -233,6 +234,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(null, bc.getTransformerClassName()); Assert.assertEquals(null, bc.getTransformerClassName());
Assert.assertEquals(null, bc.getStaticConnectors()); Assert.assertEquals(null, bc.getStaticConnectors());
Assert.assertEquals("dg1", bc.getDiscoveryGroupName()); Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
Assert.assertEquals(555, bc.getProducerWindowSize());
} }
} }
@ -267,6 +269,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0)); Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1)); Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
Assert.assertEquals(null, ccc.getDiscoveryGroupName()); Assert.assertEquals(null, ccc.getDiscoveryGroupName());
Assert.assertEquals(222, ccc.getProducerWindowSize());
} }
else else
{ {
@ -280,6 +283,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(2, ccc.getMaxHops()); Assert.assertEquals(2, ccc.getMaxHops());
Assert.assertEquals(Collections.emptyList(), ccc.getStaticConnectors()); Assert.assertEquals(Collections.emptyList(), ccc.getStaticConnectors());
Assert.assertEquals("dg1", ccc.getDiscoveryGroupName()); Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
Assert.assertEquals(333, ccc.getProducerWindowSize());
} }
} }

View File

@ -142,6 +142,7 @@
<reconnect-attempts>2</reconnect-attempts> <reconnect-attempts>2</reconnect-attempts>
<failover-on-server-shutdown>false</failover-on-server-shutdown> <failover-on-server-shutdown>false</failover-on-server-shutdown>
<use-duplicate-detection>true</use-duplicate-detection> <use-duplicate-detection>true</use-duplicate-detection>
<producer-window-size>444</producer-window-size>
<static-connectors> <static-connectors>
<connector-ref>connector1</connector-ref> <connector-ref>connector1</connector-ref>
</static-connectors> </static-connectors>
@ -149,6 +150,7 @@
<bridge name="bridge2"> <bridge name="bridge2">
<queue-name>queue2</queue-name> <queue-name>queue2</queue-name>
<forwarding-address>bridge-forwarding-address2</forwarding-address> <forwarding-address>bridge-forwarding-address2</forwarding-address>
<producer-window-size>555</producer-window-size>
<discovery-group-ref discovery-group-name="dg1"/> <discovery-group-ref discovery-group-name="dg1"/>
</bridge> </bridge>
</bridges> </bridges>
@ -167,24 +169,25 @@
</ha-policy> </ha-policy>
<cluster-connections> <cluster-connections>
<cluster-connection name="cluster-connection1"> <cluster-connection name="cluster-connection1">
<address>queues1</address> <address>queues1</address>
<connector-ref>connector1</connector-ref>
<check-period>331</check-period>
<connection-ttl>3370</connection-ttl>
<min-large-message-size>321</min-large-message-size>
<call-timeout>123</call-timeout>
<retry-interval>3</retry-interval>
<retry-interval-multiplier>0.25</retry-interval-multiplier>
<max-retry-interval>10000</max-retry-interval>
<reconnect-attempts>72</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<call-failover-timeout>123</call-failover-timeout>
<static-connectors>
<connector-ref>connector1</connector-ref> <connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref> <check-period>331</check-period>
</static-connectors> <connection-ttl>3370</connection-ttl>
<min-large-message-size>321</min-large-message-size>
<call-timeout>123</call-timeout>
<retry-interval>3</retry-interval>
<retry-interval-multiplier>0.25</retry-interval-multiplier>
<max-retry-interval>10000</max-retry-interval>
<reconnect-attempts>72</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<producer-window-size>222</producer-window-size>
<call-failover-timeout>123</call-failover-timeout>
<static-connectors>
<connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref>
</static-connectors>
</cluster-connection> </cluster-connection>
<cluster-connection name="cluster-connection2"> <cluster-connection name="cluster-connection2">
<address>queues2</address> <address>queues2</address>
@ -194,6 +197,7 @@
<use-duplicate-detection>false</use-duplicate-detection> <use-duplicate-detection>false</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers> <forward-when-no-consumers>true</forward-when-no-consumers>
<max-hops>2</max-hops> <max-hops>2</max-hops>
<producer-window-size>333</producer-window-size>
<call-failover-timeout>456</call-failover-timeout> <call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/> <discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection> </cluster-connection>

View File

@ -736,6 +736,10 @@ specified. The following shows all the available configuration options
server has received `confirmation-window-size` bytes it notifies its server has received `confirmation-window-size` bytes it notifies its
client, default is 1048576. A value of -1 means no window. client, default is 1048576. A value of -1 means no window.
- `producer-window-size`. The size for producer flow control over cluster connection.
it's by default disabled through the cluster connection bridge but you may want
to set a value if you are using really large messages in cluster. A value of -1 means no window.
- `call-failover-timeout`. Similar to `call-timeout` but used when a - `call-failover-timeout`. Similar to `call-timeout` but used when a
call is made during a failover attempt. Default is -1 (no timeout). call is made during a failover attempt. Default is -1 (no timeout).

View File

@ -127,6 +127,7 @@ Name | Description
[reconnect-attempts](core-bridges.md "Chapter 36. Core Bridges") | maximum number of retry attempts, -1 means 'no limits'. default -1 [reconnect-attempts](core-bridges.md "Chapter 36. Core Bridges") | maximum number of retry attempts, -1 means 'no limits'. default -1
[use-duplicate-detection](core-bridges.md "Chapter 36. Core Bridges") | forward duplicate detection headers?. default true [use-duplicate-detection](core-bridges.md "Chapter 36. Core Bridges") | forward duplicate detection headers?. default true
[confirmation-window-size](core-bridges.md "Chapter 36. Core Bridges") | number of bytes before confirmations are sent. default 1MB [confirmation-window-size](core-bridges.md "Chapter 36. Core Bridges") | number of bytes before confirmations are sent. default 1MB
[producer-window-size](core-bridges.md "Chapter 36. Core Bridges") | Producer flow control size on the bridge. Default -1 (disabled)
[user](core-bridges.md "Chapter 36. Core Bridges") | Username for the bridge, the default is the cluster username [user](core-bridges.md "Chapter 36. Core Bridges") | Username for the bridge, the default is the cluster username
[password](core-bridges.md "Chapter 36. Core Bridges") | Password for the bridge, default is the cluster password [password](core-bridges.md "Chapter 36. Core Bridges") | Password for the bridge, default is the cluster password
[reconnect-attempts-same-node](core-bridges.md "Chapter 36. Core Bridges") | Number of retries before trying another node. default 10 [reconnect-attempts-same-node](core-bridges.md "Chapter 36. Core Bridges") | Number of retries before trying another node. default 10
@ -165,6 +166,7 @@ Name | Description
[forward-when-no-consumers](clusters.md "Chapter 38. Clusters") | should messages be load balanced if there are no matching consumers on target? Default=false [forward-when-no-consumers](clusters.md "Chapter 38. Clusters") | should messages be load balanced if there are no matching consumers on target? Default=false
[max-hops](clusters.md "Chapter 38. Clusters") | maximum number of hops cluster topology is propagated. Default=1 [max-hops](clusters.md "Chapter 38. Clusters") | maximum number of hops cluster topology is propagated. Default=1
[confirmation-window-size](client-reconnection.md "Chapter 34. Client Reconnection and Session Reattachment")| The size (in bytes) of the window used for confirming data from the server connected to. Default 1048576 [confirmation-window-size](client-reconnection.md "Chapter 34. Client Reconnection and Session Reattachment")| The size (in bytes) of the window used for confirming data from the server connected to. Default 1048576
[producer-window-size](clusters.md "Chapter 38. Clusters") | Flow Control for the Cluster connection bridge. Default -1 (disabled)
[call-failover-timeout](clusters.md "38.3.1. Configuring Cluster Connections") | How long to wait for a reply if in the middle of a fail-over. -1 means wait forever. Default -1 [call-failover-timeout](clusters.md "38.3.1. Configuring Cluster Connections") | How long to wait for a reply if in the middle of a fail-over. -1 means wait forever. Default -1
[notification-interval](clusters.md "Chapter 38. Clusters") | how often the cluster connection will notify the cluster of its existence right after joining the cluster. Default 1000 [notification-interval](clusters.md "Chapter 38. Clusters") | how often the cluster connection will notify the cluster of its existence right after joining the cluster. Default 1000
[notification-attempts](clusters.md "Chapter 38. Clusters") | how many times this cluster connection will notify the cluster of its existence right after joining the cluster Default 2 [notification-attempts](clusters.md "Chapter 38. Clusters") | how many times this cluster connection will notify the cluster of its existence right after joining the cluster Default 2

View File

@ -195,6 +195,12 @@ Let's take a look at all the parameters in turn:
> `confirmation-window-size` is less than or equal to > `confirmation-window-size` is less than or equal to
> `max-size-bytes` to prevent the flow of messages from ceasing. > `max-size-bytes` to prevent the flow of messages from ceasing.
- `producer-window-size`. This optional parameter determines the
producer flow control through the bridge. You usually leave this off
unless you are dealing with huge large messages.
Default=-1 (disabled)
- `user`. This optional parameter determines the user name to use when - `user`. This optional parameter determines the user name to use when
creating the bridge connection to the remote server. If it is not creating the bridge connection to the remote server. If it is not
specified the default cluster user specified by `cluster-user` in specified the default cluster user specified by `cluster-user` in

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.tests.integration.cluster.bridge; package org.apache.activemq.tests.integration.cluster.bridge;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -1753,6 +1759,214 @@ public class BridgeTest extends ServiceTestBase
assertEquals(0, loadQueues(server0).size()); assertEquals(0, loadQueues(server0).size());
} }
@Test
public void testBridgeWithVeryLargeMessage() throws Exception
{
ActiveMQServer server0 = null;
ActiveMQServer server1 = null;
final int PAGE_MAX = 1024 * 1024;
final int PAGE_SIZE = 10 * 1024;
ServerLocator locator = null;
try
{
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
int minLargeMessageSize = 1024 * 1024;
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
.setName("bridge1")
.setQueueName(queueName0)
.setForwardingAddress(forwardAddress)
.setRetryInterval(1000)
.setReconnectAttemptsOnSameNode(-1)
.setUseDuplicateDetection(false)
.setConfirmationWindowSize(1024)
.setStaticConnectors(staticConnectors)
.setMinLargeMessageSize(minLargeMessageSize)
.setProducerWindowSize(minLargeMessageSize / 2);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
.setAddress(testAddress)
.setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
.setAddress(forwardAddress)
.setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
//create a large message bigger than Integer.MAX_VALUE
final long largeMessageSize = Integer.MAX_VALUE + 1000L;
ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize);
producer0.send(largeMessage);
session0.commit();
//check target queue for large message arriving
ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1));
long messageCount = query.getMessageCount();
int count = 0;
//wait for 300 sec max
while (messageCount == 0 && count < 300)
{
count++;
Thread.sleep(1000);
query = session1.queueQuery(new SimpleString(queueName1));
messageCount = query.getMessageCount();
}
if (messageCount == 0)
{
fail("large message didn't arrived after 5 min!");
}
//receive the message
ClientMessage message = consumer1.receive(5000);
message.acknowledge();
File outputFile = new File(getTemporaryDir(), "huge_message_received.dat");
System.out.println("-----message save to: " + outputFile.getAbsolutePath());
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
message.setOutputStream(bufferedOutput);
if (!message.waitOutputStreamCompletion(5 * 60 * 1000))
{
fail("message didn't get received to disk in 5 min. Is the machine slow?");
}
session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
}
finally
{
if (locator != null)
{
locator.close();
}
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
try
{
server1.stop();
}
catch (Throwable ignored)
{
}
}
assertEquals(0, loadQueues(server0).size());
}
private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize) throws Exception
{
File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
createFile(fileInput, largeMessageSize);
System.out.println("File created at: " + fileInput.getAbsolutePath());
ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true);
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setBodyInputStream(bufferedInput);
return message;
}
private static void createFile(final File file, final long fileSize) throws IOException
{
if (file.exists())
{
System.out.println("---file already there " + file.length());
return;
}
FileOutputStream fileOut = new FileOutputStream(file);
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
byte[] outBuffer = new byte[1024 * 1024];
System.out.println(" --- creating file, size: " + fileSize);
for (long i = 0; i < fileSize; i += outBuffer.length)
{
buffOut.write(outBuffer);
}
buffOut.close();
}
@Test @Test
public void testNullForwardingAddress() throws Exception public void testNullForwardingAddress() throws Exception
{ {

View File

@ -72,7 +72,7 @@ public class LinkedListTest extends UnitTestCase
LinkedListIterator<MyObject> iter = objs.iterator(); LinkedListIterator<MyObject> iter = objs.iterator();
for (int i = 0; i < 5000; i++) for (int i = 0; i < 1000; i++)
{ {
for (int add = 0; add < 1000; add++) for (int add = 0; add < 1000; add++)