diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java index 7623cdb1a8..3ff5faee8f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java @@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal try { - for (int pos = 0; pos < bodySize; ) + for (long pos = 0; pos < bodySize; ) { final boolean lastChunk; - final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); + final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); @@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); try { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java index 75469522c5..c876419462 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java @@ -201,4 +201,15 @@ public interface Channel * @param transferring whether the channel is transferring */ void setTransferring(boolean transferring); + + /** + * for large message server send, each entry in resend cache will hold a reference to + * a chunk of bytes which can cause OOM if the cache quickly build up. This method + * make sure the resent cache size can't be more than one by blocking the call. + * + * @param timeout max waiting time for the resend cache + * + * @return true if the resend cache gets cleared + */ + boolean largeServerCheck(long timeout); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java index 8df00114cb..a3d532da44 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV public class ActiveMQSessionContext extends SessionContext { + private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; @@ -425,6 +426,27 @@ public class ActiveMQSessionContext extends SessionContext @Override public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException + { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = + new SessionSendContinuationMessage(msgI, chunk, !lastChunk, + requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) + { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + + @Override + public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; final SessionSendContinuationMessage chunkPacket = @@ -439,6 +461,14 @@ public class ActiveMQSessionContext extends SessionContext else { sessionChannel.send(chunkPacket); + if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME)) + { + ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow to " + + " send back chunk confirmations. It 's possible the bridge may take more memory" + + " during sending of a large message. It may be a temporary situation if this warning" + + " occasionally shows up."); + } + } return chunkPacket.getPacketSize(); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java index a71aed562a..25f6f817d7 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java @@ -226,6 +226,27 @@ public final class ChannelImpl implements Channel this.transferring = transferring; } + @Override + public boolean largeServerCheck(long timeout) + { + if (resendCache == null) return true; + + synchronized (resendCache) + { + if (resendCache.size() >= 1) + { + try + { + resendCache.wait(timeout); + } + catch (InterruptedException e) + { + } + } + } + return resendCache.size() == 0; + } + // This must never called by more than one thread concurrently public boolean send(final Packet packet, final boolean flush, final boolean batch) { @@ -607,7 +628,12 @@ public final class ChannelImpl implements Channel firstStoredCommandID = 0; - resendCache.clear(); + synchronized (resendCache) + { + resendCache.clear(); + resendCache.notifyAll(); + } + } } @@ -672,28 +698,38 @@ public final class ChannelImpl implements Channel int sizeToFree = 0; - for (int i = 0; i < numberToClear; i++) + try { - final Packet packet = resendCache.poll(); - - if (packet == null) + for (int i = 0; i < numberToClear; i++) { - if (lastReceivedCommandID > 0) + final Packet packet = resendCache.poll(); + + if (packet == null) { - ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + if (lastReceivedCommandID > 0) + { + ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + } + firstStoredCommandID = lastReceivedCommandID + 1; + return; } - firstStoredCommandID = lastReceivedCommandID + 1; - return; - } - if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) - { - sizeToFree += packet.getPacketSize(); - } + if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) + { + sizeToFree += packet.getPacketSize(); + } - if (commandConfirmationHandler != null) + if (commandConfirmationHandler != null) + { + commandConfirmationHandler.commandConfirmed(packet); + } + } + } + finally + { + synchronized (resendCache) { - commandConfirmationHandler.commandConfirmed(packet); + resendCache.notifyAll(); } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java index af0d4f3bae..c642837b21 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java @@ -149,6 +149,8 @@ public abstract class SessionContext public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java index f9ac02425d..ff4adf3aac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.tests.integration.cluster.bridge; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1753,6 +1759,234 @@ public class BridgeTest extends ServiceTestBase assertEquals(0, loadQueues(server0).size()); } + @Test + public void testBridgeWithVeryLargeMessage() throws Exception + { + ActiveMQServer server0 = null; + ActiveMQServer server1 = null; + + final int PAGE_MAX = 1024 * 1024; + + final int PAGE_SIZE = 10 * 1024; + ServerLocator locator = null; + try + { + + Map server0Params = new HashMap(); + server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params); + + Map server1Params = new HashMap(); + addTargetParameters(server1Params); + server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + final String queueName1 = "queue1"; + + Map connectors = new HashMap(); + TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); + + TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); + connectors.put(server1tc.getName(), server1tc); + + server0.getConfiguration().setConnectorConfigurations(connectors); + + ArrayList staticConnectors = new ArrayList(); + staticConnectors.add(server1tc.getName()); + + int minLargeMessageSize = 50 * 1024 * 1024; //50M + + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration() + .setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setConfirmationWindowSize(1024) + .setStaticConnectors(staticConnectors) + .setMinLargeMessageSize(minLargeMessageSize); + + List bridgeConfigs = new ArrayList(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration() + .setAddress(testAddress) + .setName(queueName0); + List queueConfigs0 = new ArrayList(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + + CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration() + .setAddress(forwardAddress) + .setName(queueName1); + List queueConfigs1 = new ArrayList(); + queueConfigs1.add(queueConfig1); + server1.getConfiguration().setQueueConfigurations(queueConfigs1); + + server1.start(); + server0.start(); + + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); + + ClientSessionFactory sf0 = locator.createSessionFactory(server0tc); + + ClientSessionFactory sf1 = locator.createSessionFactory(server1tc); + + ClientSession session0 = sf0.createSession(false, true, true); + + ClientSession session1 = sf1.createSession(false, true, true); + + ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session1.createConsumer(queueName1); + + session1.start(); + + //create a large message bigger than Integer.MAX_VALUE + final long largeMessageSize = 3L * 1024L * 1024L * 1024L; + + File destDir = createDestDir("testBridgeWithVeryLargeMessage"); + ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize, destDir); + + producer0.send(largeMessage); + + session0.commit(); + + //check target queue for large message arriving + ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1)); + long messageCount = query.getMessageCount(); + int count = 0; + //wait for 300 sec max + while (messageCount == 0 && count < 300) + { + count++; + Thread.sleep(1000); + query = session1.queueQuery(new SimpleString(queueName1)); + messageCount = query.getMessageCount(); + } + + if (messageCount == 0) + { + fail("large message didn't arrived after 5 min!"); + } + + //receive the message + ClientMessage message = consumer1.receive(5000); + message.acknowledge(); + + File outputFile = new File(destDir, "huge_message_received.dat"); + + System.out.println("-----message save to: " + outputFile.getAbsolutePath()); + FileOutputStream fileOutputStream = new FileOutputStream(outputFile); + + BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); + + message.setOutputStream(bufferedOutput); + + if (!message.waitOutputStreamCompletion(5 * 60 * 1000)) + { + fail("message didn't get received to disk in 5 min. Is the machine slow?"); + } + session1.commit(); + + Assert.assertNull(consumer1.receiveImmediate()); + + session0.close(); + + session1.close(); + + sf0.close(); + + sf1.close(); + + } + finally + { + if (locator != null) + { + locator.close(); + } + try + { + server0.stop(); + } + catch (Throwable ignored) + { + } + + try + { + server1.stop(); + } + catch (Throwable ignored) + { + } + } + + assertEquals(0, loadQueues(server0).size()); + } + + private File createDestDir(String dirName) + { + File clientDir = new File(getClientLargeMessagesDir()); + if (!clientDir.exists()) + { + if (!clientDir.mkdirs()) + { + throw new IllegalStateException("Can't create dir " + clientDir.getAbsolutePath()); + } + } + + File destDir = new File(clientDir, dirName); + if (!destDir.mkdir()) + { + throw new IllegalStateException("Can't create dir " + destDir.getAbsolutePath()); + } + return destDir; + } + + + private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize, File destDir) throws Exception + { + + File fileInput = new File(destDir, "huge_message_to_send.dat"); + + createFile(fileInput, largeMessageSize); + + System.out.println("File created at: " + fileInput.getAbsolutePath()); + + ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true); + + FileInputStream fileInputStream = new FileInputStream(fileInput); + BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); + + message.setBodyInputStream(bufferedInput); + + return message; + } + + private static void createFile(final File file, final long fileSize) throws IOException + { + if (file.exists()) + { + System.out.println("---file already there " + file.length()); + return; + } + FileOutputStream fileOut = new FileOutputStream(file); + BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); + byte[] outBuffer = new byte[1024 * 1024]; + System.out.println(" --- creating file, size: " + fileSize); + for (long i = 0; i < fileSize; i += outBuffer.length) + { + buffOut.write(outBuffer); + } + buffOut.close(); + } + @Test public void testNullForwardingAddress() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java index 710c4d3658..95506d58a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java @@ -375,6 +375,12 @@ public class BackupSyncDelay implements Interceptor throw new UnsupportedOperationException(); } + @Override + public boolean largeServerCheck(long timeout) + { + return true; + } + @Override public boolean supports(byte packetID) {