From 147a5528e4e3c5f658357ef4796b21fc4a466c18 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 23 Apr 2015 15:01:44 -0400 Subject: [PATCH 1/3] changing iterations on test so it runs faster --- .../org/apache/activemq/tests/unit/util/LinkedListTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/util/LinkedListTest.java index 6a3d6faef3..8232183105 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/util/LinkedListTest.java @@ -72,7 +72,7 @@ public class LinkedListTest extends UnitTestCase LinkedListIterator iter = objs.iterator(); - for (int i = 0; i < 5000; i++) + for (int i = 0; i < 1000; i++) { for (int add = 0; add < 1000; add++) From c1111cc156684b15938ab3f8e34df9f4b64f57c4 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 30 Mar 2015 14:09:34 +0800 Subject: [PATCH 2/3] ACTIVEMQ6-94: HornetQ Bridge does not handle large messages When sending a large message that exceeds the size of Integer.MAX_VALUE, the bridge will get negative chunk size during fowarding. And the resend cache is not limited so there is a potential that it may get OutOfMemory exception. --- .../core/client/impl/ClientProducerImpl.java | 6 +- .../activemq/core/protocol/core/Channel.java | 11 + .../core/impl/ActiveMQSessionContext.java | 30 +++ .../core/protocol/core/impl/ChannelImpl.java | 68 +++-- .../spi/core/remoting/SessionContext.java | 2 + .../cluster/bridge/BridgeTest.java | 234 ++++++++++++++++++ .../cluster/util/BackupSyncDelay.java | 6 + 7 files changed, 338 insertions(+), 19 deletions(-) diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java index 7623cdb1a8..3ff5faee8f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java @@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal try { - for (int pos = 0; pos < bodySize; ) + for (long pos = 0; pos < bodySize; ) { final boolean lastChunk; - final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); + final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); @@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); try { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java index 75469522c5..c876419462 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java @@ -201,4 +201,15 @@ public interface Channel * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); + + /** + * for large message server send, each entry in resend cache will hold a reference to + * a chunk of bytes which can cause OOM if the cache quickly build up. This method + * make sure the resent cache size can't be more than one by blocking the call. + * + * @param timeout max waiting time for the resend cache + * + * @return true if the resend cache gets cleared + */ + boolean largeServerCheck(long timeout); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java index 8df00114cb..a3d532da44 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV public class ActiveMQSessionContext extends SessionContext { + private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; @@ -425,6 +426,27 @@ public class ActiveMQSessionContext extends SessionContext @Override public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException + { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = + new SessionSendContinuationMessage(msgI, chunk, !lastChunk, + requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) + { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + + @Override + public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; final SessionSendContinuationMessage chunkPacket = @@ -439,6 +461,14 @@ public class ActiveMQSessionContext extends SessionContext else { sessionChannel.send(chunkPacket); + if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME)) + { + ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow to " + + " send back chunk confirmations. It 's possible the bridge may take more memory" + + " during sending of a large message. It may be a temporary situation if this warning" + + " occasionally shows up."); + } + } return chunkPacket.getPacketSize(); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java index a71aed562a..25f6f817d7 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java @@ -226,6 +226,27 @@ public final class ChannelImpl implements Channel this.transferring = transferring; } + @Override + public boolean largeServerCheck(long timeout) + { + if (resendCache == null) return true; + + synchronized (resendCache) + { + if (resendCache.size() >= 1) + { + try + { + resendCache.wait(timeout); + } + catch (InterruptedException e) + { + } + } + } + return resendCache.size() == 0; + } + // This must never called by more than one thread concurrently public boolean send(final Packet packet, final boolean flush, final boolean batch) { @@ -607,7 +628,12 @@ public final class ChannelImpl implements Channel firstStoredCommandID = 0; - resendCache.clear(); + synchronized (resendCache) + { + resendCache.clear(); + resendCache.notifyAll(); + } + } } @@ -672,28 +698,38 @@ public final class ChannelImpl implements Channel int sizeToFree = 0; - for (int i = 0; i < numberToClear; i++) + try { - final Packet packet = resendCache.poll(); - - if (packet == null) + for (int i = 0; i < numberToClear; i++) { - if (lastReceivedCommandID > 0) + final Packet packet = resendCache.poll(); + + if (packet == null) { - ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + if (lastReceivedCommandID > 0) + { + ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + } + firstStoredCommandID = lastReceivedCommandID + 1; + return; } - firstStoredCommandID = lastReceivedCommandID + 1; - return; - } - if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) - { - sizeToFree += packet.getPacketSize(); - } + if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) + { + sizeToFree += packet.getPacketSize(); + } - if (commandConfirmationHandler != null) + if (commandConfirmationHandler != null) + { + commandConfirmationHandler.commandConfirmed(packet); + } + } + } + finally + { + synchronized (resendCache) { - commandConfirmationHandler.commandConfirmed(packet); + resendCache.notifyAll(); } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java index af0d4f3bae..c642837b21 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java @@ -149,6 +149,8 @@ public abstract class SessionContext public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java index f9ac02425d..ff4adf3aac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.tests.integration.cluster.bridge; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1753,6 +1759,234 @@ public class BridgeTest extends ServiceTestBase assertEquals(0, loadQueues(server0).size()); } + @Test + public void testBridgeWithVeryLargeMessage() throws Exception + { + ActiveMQServer server0 = null; + ActiveMQServer server1 = null; + + final int PAGE_MAX = 1024 * 1024; + + final int PAGE_SIZE = 10 * 1024; + ServerLocator locator = null; + try + { + + Map server0Params = new HashMap(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params); + + Map server1Params = new HashMap(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + Map connectors = new HashMap(); + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + connectors.put(server1tc.getName(), server1tc); + + server0.getConfiguration().setConnectorConfigurations(connectors); + + ArrayList staticConnectors = new ArrayList(); + staticConnectors.add(server1tc.getName()); + + int minLargeMessageSize = 50 * 1024 * 1024; //50M + + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration() + .setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setConfirmationWindowSize(1024) + .setStaticConnectors(staticConnectors) + .setMinLargeMessageSize(minLargeMessageSize); + + List bridgeConfigs = new ArrayList(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName0); + List queueConfigs0 = new ArrayList(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName1); + List queueConfigs1 = new ArrayList(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + + ClientSessionFactory sf0 = locator.createSessionFactory(server0tc); + + ClientSessionFactory sf1 = locator.createSessionFactory(server1tc); + + ClientSession session0 = sf0.createSession(false, true, true); + + ClientSession session1 = sf1.createSession(false, true, true); + + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + session1.start(); + + //create a large message bigger than Integer.MAX_VALUE + final long largeMessageSize = 3L * 1024L * 1024L * 1024L; + + File destDir = createDestDir("testBridgeWithVeryLargeMessage"); + ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize, destDir); + + producer0.send(largeMessage); + + session0.commit(); + + //check target queue for large message arriving + ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1)); + long messageCount = query.getMessageCount(); + int count = 0; + //wait for 300 sec max + while (messageCount == 0 && count < 300) + { + count++; + Thread.sleep(1000); + query = session1.queueQuery(new SimpleString(queueName1)); + messageCount = query.getMessageCount(); + } + + if (messageCount == 0) + { + fail("large message didn't arrived after 5 min!"); + } + + //receive the message + ClientMessage message = consumer1.receive(5000); + message.acknowledge(); + + File outputFile = new File(destDir, "huge_message_received.dat"); + + System.out.println("-----message save to: " + outputFile.getAbsolutePath()); + FileOutputStream fileOutputStream = new FileOutputStream(outputFile); + + BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); + + message.setOutputStream(bufferedOutput); + + if (!message.waitOutputStreamCompletion(5 * 60 * 1000)) + { + fail("message didn't get received to disk in 5 min. Is the machine slow?"); + } + session1.commit(); + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + } + finally + { + if (locator != null) + { + locator.close(); + } + try + { + server0.stop(); + } + catch (Throwable ignored) + { + } + + try + { + server1.stop(); + } + catch (Throwable ignored) + { + } + } + + assertEquals(0, loadQueues(server0).size()); + } + + private File createDestDir(String dirName) + { + File clientDir = new File(getClientLargeMessagesDir()); + if (!clientDir.exists()) + { + if (!clientDir.mkdirs()) + { + throw new IllegalStateException("Can't create dir " + clientDir.getAbsolutePath()); + } + } + + File destDir = new File(clientDir, dirName); + if (!destDir.mkdir()) + { + throw new IllegalStateException("Can't create dir " + destDir.getAbsolutePath()); + } + return destDir; + } + + + private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize, File destDir) throws Exception + { + + File fileInput = new File(destDir, "huge_message_to_send.dat"); + + createFile(fileInput, largeMessageSize); + + System.out.println("File created at: " + fileInput.getAbsolutePath()); + + ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true); + + FileInputStream fileInputStream = new FileInputStream(fileInput); + BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); + + message.setBodyInputStream(bufferedInput); + + return message; + } + + private static void createFile(final File file, final long fileSize) throws IOException + { + if (file.exists()) + { + System.out.println("---file already there " + file.length()); + return; + } + FileOutputStream fileOut = new FileOutputStream(file); + BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); + byte[] outBuffer = new byte[1024 * 1024]; + System.out.println(" --- creating file, size: " + fileSize); + for (long i = 0; i < fileSize; i += outBuffer.length) + { + buffOut.write(outBuffer); + } + buffOut.close(); + } + @Test public void testNullForwardingAddress() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java index 710c4d3658..95506d58a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java @@ -375,6 +375,12 @@ public class BackupSyncDelay implements Interceptor throw new UnsupportedOperationException(); } + @Override + public boolean largeServerCheck(long timeout) + { + return true; + } + @Override public boolean supports(byte packetID) { From ada112a6a37dce8ddf48e2238904421b2ca8e0dc Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 22 Apr 2015 21:44:28 -0400 Subject: [PATCH 3/3] ACTIVEMQ6-94: Using proper flow control on very large messages over the bridge This will remove some of the verifications written by Howard on his commit. I did this to simplify the flow control This closes #197 --- .../config/ActiveMQDefaultConfiguration.java | 14 ++++ .../activemq/core/protocol/core/Channel.java | 11 --- .../core/impl/ActiveMQSessionContext.java | 8 --- .../core/protocol/core/impl/ChannelImpl.java | 70 +++++-------------- .../core/config/BridgeConfiguration.java | 15 ++++ .../ClusterConnectionConfiguration.java | 14 ++++ .../impl/FileConfigurationParser.java | 12 +++- .../core/server/cluster/ClusterManager.java | 5 +- .../cluster/impl/ClusterConnectionImpl.java | 11 ++- .../schema/activemq-configuration.xsd | 20 +++++- .../config/impl/FileConfigurationTest.java | 4 ++ .../ConfigurationTest-full-config.xml | 38 +++++----- docs/user-manual/en/clusters.md | 4 ++ docs/user-manual/en/configuration-index.md | 2 + docs/user-manual/en/core-bridges.md | 6 ++ .../cluster/bridge/BridgeTest.java | 36 +++------- .../cluster/util/BackupSyncDelay.java | 6 -- 17 files changed, 146 insertions(+), 130 deletions(-) diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java index 4049fb4ae8..cea76338db 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java @@ -309,6 +309,10 @@ public final class ActiveMQDefaultConfiguration // Once the bridge has received this many bytes, it sends a confirmation private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576; + // Producer flow control is disabled by default on the bridge + // You probably need to enable this if you use lots of huge messages + private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1; + // Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors private static int DEFAULT_BRIDGE_CONNECT_SAME_NODE = 10; @@ -870,6 +874,16 @@ public final class ActiveMQDefaultConfiguration return DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE; } + + /** + * This default is used for both bridge and cluster connections (since they both translate to bridges) * + * @return + */ + public static int getDefaultBridgeProducerWindowSize() + { + return DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE; + } + /** * Upon reconnection this configures the number of time the same node on the topology will be retried before reseting the server locator and using the initial connectors */ diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java index c876419462..75469522c5 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java @@ -201,15 +201,4 @@ public interface Channel * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); - - /** - * for large message server send, each entry in resend cache will hold a reference to - * a chunk of bytes which can cause OOM if the cache quickly build up. This method - * make sure the resent cache size can't be more than one by blocking the call. - * - * @param timeout max waiting time for the resend cache - * - * @return true if the resend cache gets cleared - */ - boolean largeServerCheck(long timeout); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java index a3d532da44..7097bd2aa0 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -461,14 +461,6 @@ public class ActiveMQSessionContext extends SessionContext else { sessionChannel.send(chunkPacket); - if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME)) - { - ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow to " + - " send back chunk confirmations. It 's possible the bridge may take more memory" + - " during sending of a large message. It may be a temporary situation if this warning" + - " occasionally shows up."); - } - } return chunkPacket.getPacketSize(); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java index 25f6f817d7..a71aed562a 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java @@ -226,27 +226,6 @@ public final class ChannelImpl implements Channel this.transferring = transferring; } - @Override - public boolean largeServerCheck(long timeout) - { - if (resendCache == null) return true; - - synchronized (resendCache) - { - if (resendCache.size() >= 1) - { - try - { - resendCache.wait(timeout); - } - catch (InterruptedException e) - { - } - } - } - return resendCache.size() == 0; - } - // This must never called by more than one thread concurrently public boolean send(final Packet packet, final boolean flush, final boolean batch) { @@ -628,12 +607,7 @@ public final class ChannelImpl implements Channel firstStoredCommandID = 0; - synchronized (resendCache) - { - resendCache.clear(); - resendCache.notifyAll(); - } - + resendCache.clear(); } } @@ -698,38 +672,28 @@ public final class ChannelImpl implements Channel int sizeToFree = 0; - try + for (int i = 0; i < numberToClear; i++) { - for (int i = 0; i < numberToClear; i++) + final Packet packet = resendCache.poll(); + + if (packet == null) { - final Packet packet = resendCache.poll(); - - if (packet == null) + if (lastReceivedCommandID > 0) { - if (lastReceivedCommandID > 0) - { - ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); - } - firstStoredCommandID = lastReceivedCommandID + 1; - return; - } - - if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) - { - sizeToFree += packet.getPacketSize(); - } - - if (commandConfirmationHandler != null) - { - commandConfirmationHandler.commandConfirmed(packet); + ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); } + firstStoredCommandID = lastReceivedCommandID + 1; + return; } - } - finally - { - synchronized (resendCache) + + if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) { - resendCache.notifyAll(); + sizeToFree += packet.getPacketSize(); + } + + if (commandConfirmationHandler != null) + { + commandConfirmationHandler.commandConfirmed(packet); } } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java index 87fae42afa..65cb2a4866 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/BridgeConfiguration.java @@ -56,6 +56,8 @@ public final class BridgeConfiguration implements Serializable private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(); + private long clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; private String user = ActiveMQDefaultConfiguration.getDefaultClusterUser(); @@ -302,6 +304,19 @@ public final class BridgeConfiguration implements Serializable return this; } + /** The producer flow control on the birdge */ + public BridgeConfiguration setProducerWindowSize(final int producerWindowSize) + { + this.producerWindowSize = producerWindowSize; + return this; + } + + public int getProducerWindowSize() + { + return producerWindowSize; + + } + public long getClientFailureCheckPeriod() { return clientFailureCheckPeriod; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java index 3a6c3152f5..ea95dd8f86 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/ClusterConnectionConfiguration.java @@ -63,6 +63,8 @@ public final class ClusterConnectionConfiguration implements Serializable private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(); + private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(); + private boolean allowDirectConnectionsOnly = false; private int minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; @@ -198,6 +200,18 @@ public final class ClusterConnectionConfiguration implements Serializable return this; } + + public int getProducerWindowSize() + { + return producerWindowSize; + } + + public ClusterConnectionConfiguration setProducerindowSize(int producerWindowSize) + { + this.producerWindowSize = producerWindowSize; + return this; + } + public List getStaticConnectors() { return staticConnectors; diff --git a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java index 775e60cbcf..3f7c637531 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java @@ -1417,6 +1417,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil getInteger(e, "confirmation-window-size", ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), Validators.GT_ZERO); + int producerWindowSize = + getInteger(e, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), + Validators.MINUS_ONE_OR_GT_ZERO); + long clusterNotificationInterval = getLong(e, "notification-interval", ActiveMQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO); int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO); @@ -1468,6 +1472,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil .setForwardWhenNoConsumers(forwardWhenNoConsumers) .setMaxHops(maxHops) .setConfirmationWindowSize(confirmationWindowSize) + .setProducerindowSize(producerWindowSize) .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly) .setClusterNotificationInterval(clusterNotificationInterval) .setClusterNotificationAttempts(clusterNotificationAttempts); @@ -1549,6 +1554,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil getInteger(brNode, "reconnect-attempts-same-node", ActiveMQDefaultConfiguration.getDefaultBridgeConnectSameNode(), Validators.MINUS_ONE_OR_GE_ZERO); + int producerWindowSize = + getInteger(brNode, "producer-window-size", ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), + Validators.MINUS_ONE_OR_GE_ZERO); + boolean useDuplicateDetection = getBoolean(brNode, "use-duplicate-detection", ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection()); @@ -1630,7 +1639,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil .setConfirmationWindowSize(confirmationWindowSize) .setHA(ha) .setUser(user) - .setPassword(password); + .setPassword(password) + .setProducerWindowSize(producerWindowSize); if (!staticConnectorNames.isEmpty()) { diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java index 24ed7c0127..a289e42f94 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/ClusterManager.java @@ -529,8 +529,7 @@ public final class ClusterManager implements ActiveMQComponent serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection()); serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection()); serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize()); - //disable flow control - serverLocator.setProducerWindowSize(-1); + serverLocator.setProducerWindowSize(config.getProducerWindowSize()); // This will be set to 30s unless it's changed from embedded / testing // there is no reason to exception the config for this timeout @@ -735,6 +734,7 @@ public final class ClusterManager implements ActiveMQComponent config.isDuplicateDetection(), config.isForwardWhenNoConsumers(), config.getConfirmationWindowSize(), + config.getProducerWindowSize(), executorFactory, server, postOffice, @@ -777,6 +777,7 @@ public final class ClusterManager implements ActiveMQComponent config.isDuplicateDetection(), config.isForwardWhenNoConsumers(), config.getConfirmationWindowSize(), + config.getProducerWindowSize(), executorFactory, server, postOffice, diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java index 9ff99485f2..c02ed572e4 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/cluster/impl/ClusterConnectionImpl.java @@ -113,6 +113,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn private final int confirmationWindowSize; + private final int producerWindowSize; + /** * Guard for the field {@link #records}. Note that the field is {@link ConcurrentHashMap}, * however we need the guard to synchronize multiple step operations during topology updates. @@ -179,6 +181,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn final boolean useDuplicateDetection, final boolean routeWhenNoConsumers, final int confirmationWindowSize, + final int producerWindowSize, final ExecutorFactory executorFactory, final ActiveMQServer server, final PostOffice postOffice, @@ -220,6 +223,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.confirmationWindowSize = confirmationWindowSize; + this.producerWindowSize = producerWindowSize; + this.executorFactory = executorFactory; this.clusterNotificationInterval = clusterNotificationInterval; @@ -286,6 +291,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn final boolean useDuplicateDetection, final boolean routeWhenNoConsumers, final int confirmationWindowSize, + final int producerWindowSize, final ExecutorFactory executorFactory, final ActiveMQServer server, final PostOffice postOffice, @@ -333,6 +339,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn this.confirmationWindowSize = confirmationWindowSize; + this.producerWindowSize = producerWindowSize; + this.executorFactory = executorFactory; this.clusterNotificationInterval = clusterNotificationInterval; @@ -637,8 +645,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection); serverLocator.setCallTimeout(callTimeout); serverLocator.setCallFailoverTimeout(callFailoverTimeout); - // No producer flow control on the bridges, as we don't want to lock the queues - serverLocator.setProducerWindowSize(-1); + serverLocator.setProducerWindowSize(producerWindowSize); if (retryInterval > 0) { diff --git a/activemq-server/src/main/resources/schema/activemq-configuration.xsd b/activemq-server/src/main/resources/schema/activemq-configuration.xsd index 10ab978d70..31b840cb60 100644 --- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd +++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd @@ -1,4 +1,4 @@ - +