ACTIVEMQ6-94: Using proper flow control on very large messages over the bridge
This will remove some of the verifications written by Howard on his commit. I did this to simplify the flow control This closes #197
This commit is contained in:
parent
c1111cc156
commit
ada112a6a3
|
@ -309,6 +309,10 @@ public final class ActiveMQDefaultConfiguration
|
|||
// Once the bridge has received this many bytes, it sends a confirmation
|
||||
private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
|
||||
|
||||
// Producer flow control is disabled by default on the bridge
|
||||
// You probably need to enable this if you use lots of huge messages
|
||||
private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
|
||||
|
||||
// Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
|
||||
private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10;
|
||||
|
||||
|
@ -870,6 +874,16 @@ public final class ActiveMQDefaultConfiguration
|
|||
return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This default is used for both bridge and cluster connections (since they both translate to bridges) *
|
||||
* @return
|
||||
*/
|
||||
public static int getDefaultBridgeProducerWindowSize()
|
||||
{
|
||||
return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors
|
||||
*/
|
||||
|
|
|
@ -201,15 +201,4 @@ public interface Channel
|
|||
* @param transferring whether the channel is transferring
|
||||
*/
|
||||
void setTransferring(boolean transferring);
|
||||
|
||||
/**
|
||||
* for large message server send, each entry in resend cache will hold a reference to
|
||||
* a chunk of bytes which can cause OOM if the cache quickly build up. This method
|
||||
* make sure the resent cache size can't be more than one by blocking the call.
|
||||
*
|
||||
* @param timeout max waiting time for the resend cache
|
||||
*
|
||||
* @return true if the resend cache gets cleared
|
||||
*/
|
||||
boolean largeServerCheck(long timeout);
|
||||
}
|
||||
|
|
|
@ -461,14 +461,6 @@ public class ActiveMQSessionContext extends SessionContext
|
|||
else
|
||||
{
|
||||
sessionChannel.send(chunkPacket);
|
||||
if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME))
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow to " +
|
||||
" send back chunk confirmations. It 's possible the bridge may take more memory" +
|
||||
" during sending of a large message. It may be a temporary situation if this warning" +
|
||||
" occasionally shows up.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return chunkPacket.getPacketSize();
|
||||
|
|
|
@ -226,27 +226,6 @@ public final class ChannelImpl implements Channel
|
|||
this.transferring = transferring;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean largeServerCheck(long timeout)
|
||||
{
|
||||
if (resendCache == null) return true;
|
||||
|
||||
synchronized (resendCache)
|
||||
{
|
||||
if (resendCache.size() >= 1)
|
||||
{
|
||||
try
|
||||
{
|
||||
resendCache.wait(timeout);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
return resendCache.size() == 0;
|
||||
}
|
||||
|
||||
// This must never called by more than one thread concurrently
|
||||
public boolean send(final Packet packet, final boolean flush, final boolean batch)
|
||||
{
|
||||
|
@ -628,12 +607,7 @@ public final class ChannelImpl implements Channel
|
|||
|
||||
firstStoredCommandID = 0;
|
||||
|
||||
synchronized (resendCache)
|
||||
{
|
||||
resendCache.clear();
|
||||
resendCache.notifyAll();
|
||||
}
|
||||
|
||||
resendCache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,38 +672,28 @@ public final class ChannelImpl implements Channel
|
|||
|
||||
int sizeToFree = 0;
|
||||
|
||||
try
|
||||
for (int i = 0; i < numberToClear; i++)
|
||||
{
|
||||
for (int i = 0; i < numberToClear; i++)
|
||||
final Packet packet = resendCache.poll();
|
||||
|
||||
if (packet == null)
|
||||
{
|
||||
final Packet packet = resendCache.poll();
|
||||
|
||||
if (packet == null)
|
||||
if (lastReceivedCommandID > 0)
|
||||
{
|
||||
if (lastReceivedCommandID > 0)
|
||||
{
|
||||
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
|
||||
}
|
||||
firstStoredCommandID = lastReceivedCommandID + 1;
|
||||
return;
|
||||
}
|
||||
|
||||
if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
|
||||
{
|
||||
sizeToFree += packet.getPacketSize();
|
||||
}
|
||||
|
||||
if (commandConfirmationHandler != null)
|
||||
{
|
||||
commandConfirmationHandler.commandConfirmed(packet);
|
||||
ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
|
||||
}
|
||||
firstStoredCommandID = lastReceivedCommandID + 1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
synchronized (resendCache)
|
||||
|
||||
if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
|
||||
{
|
||||
resendCache.notifyAll();
|
||||
sizeToFree += packet.getPacketSize();
|
||||
}
|
||||
|
||||
if (commandConfirmationHandler != null)
|
||||
{
|
||||
commandConfirmationHandler.commandConfirmed(packet);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,8 @@ public final class BridgeConfiguration implements Serializable
|
|||
|
||||
private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
|
||||
|
||||
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
|
||||
|
||||
private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
|
||||
|
||||
private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser();
|
||||
|
@ -302,6 +304,19 @@ public final class BridgeConfiguration implements Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
/** The producer flow control on the birdge */
|
||||
public BridgeConfiguration setProducerWindowSize(final int producerWindowSize)
|
||||
{
|
||||
this.producerWindowSize = producerWindowSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getProducerWindowSize()
|
||||
{
|
||||
return producerWindowSize;
|
||||
|
||||
}
|
||||
|
||||
public long getClientFailureCheckPeriod()
|
||||
{
|
||||
return clientFailureCheckPeriod;
|
||||
|
|
|
@ -63,6 +63,8 @@ public final class ClusterConnectionConfiguration implements Serializable
|
|||
|
||||
private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
|
||||
|
||||
private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize();
|
||||
|
||||
private boolean allowDirectConnectionsOnly = false;
|
||||
|
||||
private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
|
||||
|
@ -198,6 +200,18 @@ public final class ClusterConnectionConfiguration implements Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
public int getProducerWindowSize()
|
||||
{
|
||||
return producerWindowSize;
|
||||
}
|
||||
|
||||
public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize)
|
||||
{
|
||||
this.producerWindowSize = producerWindowSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public List<String> getStaticConnectors()
|
||||
{
|
||||
return staticConnectors;
|
||||
|
|
|
@ -1417,6 +1417,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(),
|
||||
Validators.GT_ZERO);
|
||||
|
||||
int producerWindowSize =
|
||||
getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(),
|
||||
Validators.MINUS_ONE_OR_GT_ZERO);
|
||||
|
||||
long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO);
|
||||
|
||||
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
|
||||
|
@ -1468,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
.setForwardWhenNoConsumers(forwardWhenNoConsumers)
|
||||
.setMaxHops(maxHops)
|
||||
.setConfirmationWindowSize(confirmationWindowSize)
|
||||
.setProducerindowSize(producerWindowSize)
|
||||
.setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
|
||||
.setClusterNotificationInterval(clusterNotificationInterval)
|
||||
.setClusterNotificationAttempts(clusterNotificationAttempts);
|
||||
|
@ -1549,6 +1554,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
getInteger(brNode, "reconnect-attempts-same-node", ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(),
|
||||
Validators.MINUS_ONE_OR_GE_ZERO);
|
||||
|
||||
int producerWindowSize =
|
||||
getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(),
|
||||
Validators.MINUS_ONE_OR_GE_ZERO);
|
||||
|
||||
boolean useDuplicateDetection = getBoolean(brNode,
|
||||
"use-duplicate-detection",
|
||||
ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection());
|
||||
|
@ -1630,7 +1639,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
|
|||
.setConfirmationWindowSize(confirmationWindowSize)
|
||||
.setHA(ha)
|
||||
.setUser(user)
|
||||
.setPassword(password);
|
||||
.setPassword(password)
|
||||
.setProducerWindowSize(producerWindowSize);
|
||||
|
||||
if (!staticConnectorNames.isEmpty())
|
||||
{
|
||||
|
|
|
@ -529,8 +529,7 @@ public final class ClusterManager implements ActiveMQComponent
|
|||
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
|
||||
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
|
||||
serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
|
||||
//disable flow control
|
||||
serverLocator.setProducerWindowSize(-1);
|
||||
serverLocator.setProducerWindowSize(config.getProducerWindowSize());
|
||||
|
||||
// This will be set to 30s unless it's changed from embedded / testing
|
||||
// there is no reason to exception the config for this timeout
|
||||
|
@ -735,6 +734,7 @@ public final class ClusterManager implements ActiveMQComponent
|
|||
config.isDuplicateDetection(),
|
||||
config.isForwardWhenNoConsumers(),
|
||||
config.getConfirmationWindowSize(),
|
||||
config.getProducerWindowSize(),
|
||||
executorFactory,
|
||||
server,
|
||||
postOffice,
|
||||
|
@ -777,6 +777,7 @@ public final class ClusterManager implements ActiveMQComponent
|
|||
config.isDuplicateDetection(),
|
||||
config.isForwardWhenNoConsumers(),
|
||||
config.getConfirmationWindowSize(),
|
||||
config.getProducerWindowSize(),
|
||||
executorFactory,
|
||||
server,
|
||||
postOffice,
|
||||
|
|
|
@ -113,6 +113,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
private final int confirmationWindowSize;
|
||||
|
||||
private final int producerWindowSize;
|
||||
|
||||
/**
|
||||
* Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap},
|
||||
* however we need the guard to synchronize multiple step operations during topology updates.
|
||||
|
@ -179,6 +181,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
final boolean useDuplicateDetection,
|
||||
final boolean routeWhenNoConsumers,
|
||||
final int confirmationWindowSize,
|
||||
final int producerWindowSize,
|
||||
final ExecutorFactory executorFactory,
|
||||
final ActiveMQServer server,
|
||||
final PostOffice postOffice,
|
||||
|
@ -220,6 +223,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
this.confirmationWindowSize = confirmationWindowSize;
|
||||
|
||||
this.producerWindowSize = producerWindowSize;
|
||||
|
||||
this.executorFactory = executorFactory;
|
||||
|
||||
this.clusterNotificationInterval = clusterNotificationInterval;
|
||||
|
@ -286,6 +291,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
final boolean useDuplicateDetection,
|
||||
final boolean routeWhenNoConsumers,
|
||||
final int confirmationWindowSize,
|
||||
final int producerWindowSize,
|
||||
final ExecutorFactory executorFactory,
|
||||
final ActiveMQServer server,
|
||||
final PostOffice postOffice,
|
||||
|
@ -333,6 +339,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
this.confirmationWindowSize = confirmationWindowSize;
|
||||
|
||||
this.producerWindowSize = producerWindowSize;
|
||||
|
||||
this.executorFactory = executorFactory;
|
||||
|
||||
this.clusterNotificationInterval = clusterNotificationInterval;
|
||||
|
@ -637,8 +645,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
|
||||
serverLocator.setCallTimeout(callTimeout);
|
||||
serverLocator.setCallFailoverTimeout(callFailoverTimeout);
|
||||
// No producer flow control on the bridges, as we don't want to lock the queues
|
||||
serverLocator.setProducerWindowSize(-1);
|
||||
serverLocator.setProducerWindowSize(producerWindowSize);
|
||||
|
||||
if (retryInterval > 0)
|
||||
{
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
|
@ -287,7 +287,7 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
|
||||
<xsd:element name="remoting-incoming-interceptors" type="class-name-sequenceType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
@ -1049,6 +1049,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="1048576">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Producer flow control on the bridge (default disabled = -1)
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
@ -1231,6 +1239,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="producer-window-size" type="xsd:int" maxOccurs="1" minOccurs="0" default="1048576">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Producer flow control on the bridge (default disabled = -1)
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="call-failover-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -223,6 +223,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
|||
Assert.assertEquals(true, bc.isUseDuplicateDetection());
|
||||
Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
|
||||
Assert.assertEquals(null, bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(444, bc.getProducerWindowSize());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -233,6 +234,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
|||
Assert.assertEquals(null, bc.getTransformerClassName());
|
||||
Assert.assertEquals(null, bc.getStaticConnectors());
|
||||
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(555, bc.getProducerWindowSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,6 +269,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
|||
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
|
||||
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
|
||||
Assert.assertEquals(null, ccc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(222, ccc.getProducerWindowSize());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -280,6 +283,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
|
|||
Assert.assertEquals(2, ccc.getMaxHops());
|
||||
Assert.assertEquals(Collections.emptyList(), ccc.getStaticConnectors());
|
||||
Assert.assertEquals("dg1", ccc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(333, ccc.getProducerWindowSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -142,6 +142,7 @@
|
|||
<reconnect-attempts>2</reconnect-attempts>
|
||||
<failover-on-server-shutdown>false</failover-on-server-shutdown>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<producer-window-size>444</producer-window-size>
|
||||
<static-connectors>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
</static-connectors>
|
||||
|
@ -149,6 +150,7 @@
|
|||
<bridge name="bridge2">
|
||||
<queue-name>queue2</queue-name>
|
||||
<forwarding-address>bridge-forwarding-address2</forwarding-address>
|
||||
<producer-window-size>555</producer-window-size>
|
||||
<discovery-group-ref discovery-group-name="dg1"/>
|
||||
</bridge>
|
||||
</bridges>
|
||||
|
@ -167,24 +169,25 @@
|
|||
</ha-policy>
|
||||
<cluster-connections>
|
||||
<cluster-connection name="cluster-connection1">
|
||||
<address>queues1</address>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
<check-period>331</check-period>
|
||||
<connection-ttl>3370</connection-ttl>
|
||||
<min-large-message-size>321</min-large-message-size>
|
||||
<call-timeout>123</call-timeout>
|
||||
<retry-interval>3</retry-interval>
|
||||
<retry-interval-multiplier>0.25</retry-interval-multiplier>
|
||||
<max-retry-interval>10000</max-retry-interval>
|
||||
<reconnect-attempts>72</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<forward-when-no-consumers>false</forward-when-no-consumers>
|
||||
<max-hops>1</max-hops>
|
||||
<call-failover-timeout>123</call-failover-timeout>
|
||||
<static-connectors>
|
||||
<address>queues1</address>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
<connector-ref>connector2</connector-ref>
|
||||
</static-connectors>
|
||||
<check-period>331</check-period>
|
||||
<connection-ttl>3370</connection-ttl>
|
||||
<min-large-message-size>321</min-large-message-size>
|
||||
<call-timeout>123</call-timeout>
|
||||
<retry-interval>3</retry-interval>
|
||||
<retry-interval-multiplier>0.25</retry-interval-multiplier>
|
||||
<max-retry-interval>10000</max-retry-interval>
|
||||
<reconnect-attempts>72</reconnect-attempts>
|
||||
<use-duplicate-detection>true</use-duplicate-detection>
|
||||
<forward-when-no-consumers>false</forward-when-no-consumers>
|
||||
<max-hops>1</max-hops>
|
||||
<producer-window-size>222</producer-window-size>
|
||||
<call-failover-timeout>123</call-failover-timeout>
|
||||
<static-connectors>
|
||||
<connector-ref>connector1</connector-ref>
|
||||
<connector-ref>connector2</connector-ref>
|
||||
</static-connectors>
|
||||
</cluster-connection>
|
||||
<cluster-connection name="cluster-connection2">
|
||||
<address>queues2</address>
|
||||
|
@ -194,6 +197,7 @@
|
|||
<use-duplicate-detection>false</use-duplicate-detection>
|
||||
<forward-when-no-consumers>true</forward-when-no-consumers>
|
||||
<max-hops>2</max-hops>
|
||||
<producer-window-size>333</producer-window-size>
|
||||
<call-failover-timeout>456</call-failover-timeout>
|
||||
<discovery-group-ref discovery-group-name="dg1"/>
|
||||
</cluster-connection>
|
||||
|
|
|
@ -736,6 +736,10 @@ specified. The following shows all the available configuration options
|
|||
server has received `confirmation-window-size` bytes it notifies its
|
||||
client, default is 1048576. A value of -1 means no window.
|
||||
|
||||
- `producer-window-size`. The size for producer flow control over cluster connection.
|
||||
it's by default disabled through the cluster connection bridge but you may want
|
||||
to set a value if you are using really large messages in cluster. A value of -1 means no window.
|
||||
|
||||
- `call-failover-timeout`. Similar to `call-timeout` but used when a
|
||||
call is made during a failover attempt. Default is -1 (no timeout).
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@ Name | Description
|
|||
[reconnect-attempts](core-bridges.md "Chapter 36. Core Bridges") | maximum number of retry attempts, -1 means 'no limits'. default -1
|
||||
[use-duplicate-detection](core-bridges.md "Chapter 36. Core Bridges") | forward duplicate detection headers?. default true
|
||||
[confirmation-window-size](core-bridges.md "Chapter 36. Core Bridges") | number of bytes before confirmations are sent. default 1MB
|
||||
[producer-window-size](core-bridges.md "Chapter 36. Core Bridges") | Producer flow control size on the bridge. Default -1 (disabled)
|
||||
[user](core-bridges.md "Chapter 36. Core Bridges") | Username for the bridge, the default is the cluster username
|
||||
[password](core-bridges.md "Chapter 36. Core Bridges") | Password for the bridge, default is the cluster password
|
||||
[reconnect-attempts-same-node](core-bridges.md "Chapter 36. Core Bridges") | Number of retries before trying another node. default 10
|
||||
|
@ -165,6 +166,7 @@ Name | Description
|
|||
[forward-when-no-consumers](clusters.md "Chapter 38. Clusters") | should messages be load balanced if there are no matching consumers on target? Default=false
|
||||
[max-hops](clusters.md "Chapter 38. Clusters") | maximum number of hops cluster topology is propagated. Default=1
|
||||
[confirmation-window-size](client-reconnection.md "Chapter 34. Client Reconnection and Session Reattachment")| The size (in bytes) of the window used for confirming data from the server connected to. Default 1048576
|
||||
[producer-window-size](clusters.md "Chapter 38. Clusters") | Flow Control for the Cluster connection bridge. Default -1 (disabled)
|
||||
[call-failover-timeout](clusters.md "38.3.1. Configuring Cluster Connections") | How long to wait for a reply if in the middle of a fail-over. -1 means wait forever. Default -1
|
||||
[notification-interval](clusters.md "Chapter 38. Clusters") | how often the cluster connection will notify the cluster of its existence right after joining the cluster. Default 1000
|
||||
[notification-attempts](clusters.md "Chapter 38. Clusters") | how many times this cluster connection will notify the cluster of its existence right after joining the cluster Default 2
|
||||
|
|
|
@ -195,6 +195,12 @@ Let's take a look at all the parameters in turn:
|
|||
> `confirmation-window-size` is less than or equal to
|
||||
> `max-size-bytes` to prevent the flow of messages from ceasing.
|
||||
|
||||
- `producer-window-size`. This optional parameter determines the
|
||||
producer flow control through the bridge. You usually leave this off
|
||||
unless you are dealing with huge large messages.
|
||||
|
||||
Default=-1 (disabled)
|
||||
|
||||
- `user`. This optional parameter determines the user name to use when
|
||||
creating the bridge connection to the remote server. If it is not
|
||||
specified the default cluster user specified by `cluster-user` in
|
||||
|
|
|
@ -1795,7 +1795,7 @@ public class BridgeTest extends ServiceTestBase
|
|||
ArrayList<String> staticConnectors = new ArrayList<String>();
|
||||
staticConnectors.add(server1tc.getName());
|
||||
|
||||
int minLargeMessageSize = 50 * 1024 * 1024; //50M
|
||||
int minLargeMessageSize = 1024 * 1024;
|
||||
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
|
||||
.setName("bridge1")
|
||||
|
@ -1806,7 +1806,8 @@ public class BridgeTest extends ServiceTestBase
|
|||
.setUseDuplicateDetection(false)
|
||||
.setConfirmationWindowSize(1024)
|
||||
.setStaticConnectors(staticConnectors)
|
||||
.setMinLargeMessageSize(minLargeMessageSize);
|
||||
.setMinLargeMessageSize(minLargeMessageSize)
|
||||
.setProducerWindowSize(minLargeMessageSize / 2);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
|
@ -1847,10 +1848,9 @@ public class BridgeTest extends ServiceTestBase
|
|||
session1.start();
|
||||
|
||||
//create a large message bigger than Integer.MAX_VALUE
|
||||
final long largeMessageSize = 3L * 1024L * 1024L * 1024L;
|
||||
final long largeMessageSize = Integer.MAX_VALUE + 1000L;
|
||||
|
||||
File destDir = createDestDir("testBridgeWithVeryLargeMessage");
|
||||
ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize, destDir);
|
||||
ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize);
|
||||
|
||||
producer0.send(largeMessage);
|
||||
|
||||
|
@ -1878,7 +1878,7 @@ public class BridgeTest extends ServiceTestBase
|
|||
ClientMessage message = consumer1.receive(5000);
|
||||
message.acknowledge();
|
||||
|
||||
File outputFile = new File(destDir, "huge_message_received.dat");
|
||||
File outputFile = new File(getTemporaryDir(), "huge_message_received.dat");
|
||||
|
||||
System.out.println("-----message save to: " + outputFile.getAbsolutePath());
|
||||
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
|
||||
|
@ -1930,30 +1930,10 @@ public class BridgeTest extends ServiceTestBase
|
|||
assertEquals(0, loadQueues(server0).size());
|
||||
}
|
||||
|
||||
private File createDestDir(String dirName)
|
||||
{
|
||||
File clientDir = new File(getClientLargeMessagesDir());
|
||||
if (!clientDir.exists())
|
||||
{
|
||||
if (!clientDir.mkdirs())
|
||||
{
|
||||
throw new IllegalStateException("Can't create dir " + clientDir.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
File destDir = new File(clientDir, dirName);
|
||||
if (!destDir.mkdir())
|
||||
{
|
||||
throw new IllegalStateException("Can't create dir " + destDir.getAbsolutePath());
|
||||
}
|
||||
return destDir;
|
||||
}
|
||||
|
||||
|
||||
private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize, File destDir) throws Exception
|
||||
private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize) throws Exception
|
||||
{
|
||||
|
||||
File fileInput = new File(destDir, "huge_message_to_send.dat");
|
||||
File fileInput = new File(getTemporaryDir(), "huge_message_to_send.dat");
|
||||
|
||||
createFile(fileInput, largeMessageSize);
|
||||
|
||||
|
|
|
@ -375,12 +375,6 @@ public class BackupSyncDelay implements Interceptor
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean largeServerCheck(long timeout)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(byte packetID)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue