ACTIVEMQ6-94: HornetQ Bridge does not handle large messages

When sending a large message that exceeds the size of
Integer.MAX_VALUE, the bridge will get negative chunk size during
fowarding. And the resend cache is not limited so there is a
potential that it may get OutOfMemory exception.
This commit is contained in:
Howard Gao 2015-03-30 14:09:34 +08:00 committed by Clebert Suconic
parent 147a5528e4
commit c1111cc156
7 changed files with 338 additions and 19 deletions

View File

@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal
try
{
for (int pos = 0; pos < bodySize; )
for (long pos = 0; pos < bodySize; )
{
final boolean lastChunk;
final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize);
final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize);
final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
try
{

View File

@ -201,4 +201,15 @@ public interface Channel
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);
/**
* for large message server send, each entry in resend cache will hold a reference to
* a chunk of bytes which can cause OOM if the cache quickly build up. This method
* make sure the resent cache size can't be more than one by blocking the call.
*
* @param timeout max waiting time for the resend cache
*
* @return true if the resend cache gets cleared
*/
boolean largeServerCheck(long timeout);
}

View File

@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV
public class ActiveMQSessionContext extends SessionContext
{
private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec
private final Channel sessionChannel;
private final int serverVersion;
private int confirmationWindow;
@ -444,6 +445,35 @@ public class ActiveMQSessionContext extends SessionContext
return chunkPacket.getPacketSize();
}
@Override
public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException
{
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket =
new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
requiresResponse, messageBodySize, messageHandler);
if (requiresResponse)
{
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
}
else
{
sessionChannel.send(chunkPacket);
if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME))
{
ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow to " +
" send back chunk confirmations. It 's possible the bridge may take more memory" +
" during sending of a large message. It may be a temporary situation if this warning" +
" occasionally shows up.");
}
}
return chunkPacket.getPacketSize();
}
public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException
{
PacketImpl messagePacket;

View File

@ -226,6 +226,27 @@ public final class ChannelImpl implements Channel
this.transferring = transferring;
}
@Override
public boolean largeServerCheck(long timeout)
{
if (resendCache == null) return true;
synchronized (resendCache)
{
if (resendCache.size() >= 1)
{
try
{
resendCache.wait(timeout);
}
catch (InterruptedException e)
{
}
}
}
return resendCache.size() == 0;
}
// This must never called by more than one thread concurrently
public boolean send(final Packet packet, final boolean flush, final boolean batch)
{
@ -607,7 +628,12 @@ public final class ChannelImpl implements Channel
firstStoredCommandID = 0;
synchronized (resendCache)
{
resendCache.clear();
resendCache.notifyAll();
}
}
}
@ -672,6 +698,8 @@ public final class ChannelImpl implements Channel
int sizeToFree = 0;
try
{
for (int i = 0; i < numberToClear; i++)
{
final Packet packet = resendCache.poll();
@ -696,6 +724,14 @@ public final class ChannelImpl implements Channel
commandConfirmationHandler.commandConfirmed(packet);
}
}
}
finally
{
synchronized (resendCache)
{
resendCache.notifyAll();
}
}
firstStoredCommandID += numberToClear;
}

View File

@ -149,6 +149,8 @@ public abstract class SessionContext
public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.tests.integration.cluster.bridge;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1753,6 +1759,234 @@ public class BridgeTest extends ServiceTestBase
assertEquals(0, loadQueues(server0).size());
}
@Test
public void testBridgeWithVeryLargeMessage() throws Exception
{
ActiveMQServer server0 = null;
ActiveMQServer server1 = null;
final int PAGE_MAX = 1024 * 1024;
final int PAGE_SIZE = 10 * 1024;
ServerLocator locator = null;
try
{
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
Map<String, Object> server1Params = new HashMap<String, Object>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(server1tc.getName());
int minLargeMessageSize = 50 * 1024 * 1024; //50M
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
.setName("bridge1")
.setQueueName(queueName0)
.setForwardingAddress(forwardAddress)
.setRetryInterval(1000)
.setReconnectAttemptsOnSameNode(-1)
.setUseDuplicateDetection(false)
.setConfirmationWindowSize(1024)
.setStaticConnectors(staticConnectors)
.setMinLargeMessageSize(minLargeMessageSize);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
.setAddress(testAddress)
.setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
.setAddress(forwardAddress)
.setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
//create a large message bigger than Integer.MAX_VALUE
final long largeMessageSize = 3L * 1024L * 1024L * 1024L;
File destDir = createDestDir("testBridgeWithVeryLargeMessage");
ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize, destDir);
producer0.send(largeMessage);
session0.commit();
//check target queue for large message arriving
ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1));
long messageCount = query.getMessageCount();
int count = 0;
//wait for 300 sec max
while (messageCount == 0 && count < 300)
{
count++;
Thread.sleep(1000);
query = session1.queueQuery(new SimpleString(queueName1));
messageCount = query.getMessageCount();
}
if (messageCount == 0)
{
fail("large message didn't arrived after 5 min!");
}
//receive the message
ClientMessage message = consumer1.receive(5000);
message.acknowledge();
File outputFile = new File(destDir, "huge_message_received.dat");
System.out.println("-----message save to: " + outputFile.getAbsolutePath());
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
message.setOutputStream(bufferedOutput);
if (!message.waitOutputStreamCompletion(5 * 60 * 1000))
{
fail("message didn't get received to disk in 5 min. Is the machine slow?");
}
session1.commit();
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
}
finally
{
if (locator != null)
{
locator.close();
}
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
try
{
server1.stop();
}
catch (Throwable ignored)
{
}
}
assertEquals(0, loadQueues(server0).size());
}
private File createDestDir(String dirName)
{
File clientDir = new File(getClientLargeMessagesDir());
if (!clientDir.exists())
{
if (!clientDir.mkdirs())
{
throw new IllegalStateException("Can't create dir " + clientDir.getAbsolutePath());
}
}
File destDir = new File(clientDir, dirName);
if (!destDir.mkdir())
{
throw new IllegalStateException("Can't create dir " + destDir.getAbsolutePath());
}
return destDir;
}
private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize, File destDir) throws Exception
{
File fileInput = new File(destDir, "huge_message_to_send.dat");
createFile(fileInput, largeMessageSize);
System.out.println("File created at: " + fileInput.getAbsolutePath());
ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true);
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setBodyInputStream(bufferedInput);
return message;
}
private static void createFile(final File file, final long fileSize) throws IOException
{
if (file.exists())
{
System.out.println("---file already there " + file.length());
return;
}
FileOutputStream fileOut = new FileOutputStream(file);
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
byte[] outBuffer = new byte[1024 * 1024];
System.out.println(" --- creating file, size: " + fileSize);
for (long i = 0; i < fileSize; i += outBuffer.length)
{
buffOut.write(outBuffer);
}
buffOut.close();
}
@Test
public void testNullForwardingAddress() throws Exception
{

View File

@ -375,6 +375,12 @@ public class BackupSyncDelay implements Interceptor
throw new UnsupportedOperationException();
}
@Override
public boolean largeServerCheck(long timeout)
{
return true;
}
@Override
public boolean supports(byte packetID)
{