ACTIVEMQ6-95 Large Message doesn't work on clustering & null Persistence
https://issues.apache.org/jira/browse/ACTIVEMQ6-95 The message.copy is broken when you set persistence=false, and the bridge will use that method before forwarding the message this commit is fixing NullStorageLargeServerMessage.copy and adding the proper testcase to validate the fix
This commit is contained in:
parent
ce0c3d9e63
commit
dea60ed3b6
|
@ -19,6 +19,7 @@ package org.apache.activemq.core.persistence.impl.nullpm;
|
||||||
import org.apache.activemq.api.core.ActiveMQBuffers;
|
import org.apache.activemq.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.core.journal.SequentialFile;
|
import org.apache.activemq.core.journal.SequentialFile;
|
||||||
import org.apache.activemq.core.server.LargeServerMessage;
|
import org.apache.activemq.core.server.LargeServerMessage;
|
||||||
|
import org.apache.activemq.core.server.ServerMessage;
|
||||||
import org.apache.activemq.core.server.impl.ServerMessageImpl;
|
import org.apache.activemq.core.server.impl.ServerMessageImpl;
|
||||||
|
|
||||||
class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage
|
class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage
|
||||||
|
@ -29,6 +30,11 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NullStorageLargeServerMessage(NullStorageLargeServerMessage other)
|
||||||
|
{
|
||||||
|
super(other);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void releaseResources()
|
public void releaseResources()
|
||||||
{
|
{
|
||||||
|
@ -79,7 +85,13 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
|
return "NullStorageLargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServerMessage copy()
|
||||||
|
{
|
||||||
|
// This is a simple copy, used only to avoid changing original properties
|
||||||
|
return new NullStorageLargeServerMessage(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -282,7 +282,7 @@ public abstract class UnitTestCase extends CoreUnitTestCase
|
||||||
* @return
|
* @return
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
protected final ConfigurationImpl createBasicConfig(final int serverID)
|
protected ConfigurationImpl createBasicConfig(final int serverID)
|
||||||
{
|
{
|
||||||
ConfigurationImpl configuration = new ConfigurationImpl()
|
ConfigurationImpl configuration = new ConfigurationImpl()
|
||||||
.setSecurityEnabled(false)
|
.setSecurityEnabled(false)
|
||||||
|
|
|
@ -18,23 +18,62 @@ package org.apache.activemq.tests.integration.jms.cluster;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.TransportConfiguration;
|
import org.apache.activemq.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.api.jms.ActiveMQJMSClient;
|
import org.apache.activemq.api.jms.ActiveMQJMSClient;
|
||||||
import org.apache.activemq.api.jms.JMSFactoryType;
|
import org.apache.activemq.api.jms.JMSFactoryType;
|
||||||
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
|
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
|
||||||
import org.apache.activemq.core.config.Configuration;
|
import org.apache.activemq.core.config.Configuration;
|
||||||
|
import org.apache.activemq.core.config.impl.ConfigurationImpl;
|
||||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.tests.util.JMSClusteredTestBase;
|
import org.apache.activemq.tests.util.JMSClusteredTestBase;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
public class LargeMessageOverBridgeTest extends JMSClusteredTestBase
|
public class LargeMessageOverBridgeTest extends JMSClusteredTestBase
|
||||||
{
|
{
|
||||||
|
|
||||||
|
private final boolean persistent;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean enablePersistence()
|
||||||
|
{
|
||||||
|
return persistent;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "persistent={0}")
|
||||||
|
public static Collection getParameters()
|
||||||
|
{
|
||||||
|
return Arrays.asList(new Object[][]{
|
||||||
|
{true},
|
||||||
|
{false}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final ConfigurationImpl createBasicConfig(final int serverID)
|
||||||
|
{
|
||||||
|
ConfigurationImpl configuration = super.createBasicConfig(serverID);
|
||||||
|
configuration.setJournalFileSize(1024 * 1024);
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public LargeMessageOverBridgeTest(boolean persistent)
|
||||||
|
{
|
||||||
|
this.persistent = persistent;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This was causing a text message to ber eventually converted into large message when sent over the bridge
|
* This was causing a text message to ber eventually converted into large message when sent over the bridge
|
||||||
*
|
*
|
||||||
|
@ -78,6 +117,56 @@ public class LargeMessageOverBridgeTest extends JMSClusteredTestBase
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This was causing a text message to ber eventually converted into large message when sent over the bridge
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSendMapMessageOverCluster() throws Exception
|
||||||
|
{
|
||||||
|
createQueue("Q1");
|
||||||
|
|
||||||
|
Queue queue = (Queue)context1.lookup("queue/Q1");
|
||||||
|
Connection conn1 = cf1.createConnection();
|
||||||
|
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer prod1 = session1.createProducer(queue);
|
||||||
|
|
||||||
|
Connection conn2 = cf2.createConnection();
|
||||||
|
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer cons2 = session2.createConsumer(queue);
|
||||||
|
conn2.start();
|
||||||
|
|
||||||
|
StringBuffer buffer = new StringBuffer();
|
||||||
|
|
||||||
|
for (int i = 0; i < 3810002; i++)
|
||||||
|
{
|
||||||
|
buffer.append('a');
|
||||||
|
}
|
||||||
|
|
||||||
|
final int NUMBER_OF_MESSAGES = 1;
|
||||||
|
|
||||||
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
|
||||||
|
{
|
||||||
|
MapMessage msg = session1.createMapMessage();
|
||||||
|
msg.setString("str", buffer.toString());
|
||||||
|
msg.setIntProperty("i", i);
|
||||||
|
prod1.send(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
|
||||||
|
{
|
||||||
|
MapMessage msg = (MapMessage)cons2.receive(5000);
|
||||||
|
assertEquals(buffer.toString(), msg.getString("str"));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNull(cons2.receiveNoWait());
|
||||||
|
|
||||||
|
conn1.close();
|
||||||
|
conn2.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected Configuration createConfigServer2()
|
protected Configuration createConfigServer2()
|
||||||
{
|
{
|
||||||
|
@ -156,7 +245,7 @@ public class LargeMessageOverBridgeTest extends JMSClusteredTestBase
|
||||||
|
|
||||||
for (int i = 0; i < 5; i++)
|
for (int i = 0; i < 5; i++)
|
||||||
{
|
{
|
||||||
BytesMessage msg2 = (BytesMessage) cons2.receive(10000);
|
BytesMessage msg2 = (BytesMessage) cons2.receive(5000);
|
||||||
assertNotNull(msg2);
|
assertNotNull(msg2);
|
||||||
msg2.acknowledge();
|
msg2.acknowledge();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue