From dea60ed3b6fb95a95ecd6286026b6b2a2d51787d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 31 Mar 2015 11:20:25 -0400 Subject: [PATCH] ACTIVEMQ6-95 Large Message doesn't work on clustering & null Persistence https://issues.apache.org/jira/browse/ACTIVEMQ6-95 The message.copy is broken when you set persistence=false, and the bridge will use that method before forwarding the message this commit is fixing NullStorageLargeServerMessage.copy and adding the proper testcase to validate the fix --- .../nullpm/NullStorageLargeServerMessage.java | 14 ++- .../activemq/tests/util/UnitTestCase.java | 2 +- .../cluster/LargeMessageOverBridgeTest.java | 91 ++++++++++++++++++- 3 files changed, 104 insertions(+), 3 deletions(-) diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index c52f81a46c..d61bb1efa4 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.core.persistence.impl.nullpm; import org.apache.activemq.api.core.ActiveMQBuffers; import org.apache.activemq.core.journal.SequentialFile; import org.apache.activemq.core.server.LargeServerMessage; +import org.apache.activemq.core.server.ServerMessage; import org.apache.activemq.core.server.impl.ServerMessageImpl; class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage @@ -29,6 +30,11 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe super(); } + public NullStorageLargeServerMessage(NullStorageLargeServerMessage other) + { + super(other); + } + @Override public void releaseResources() { @@ -79,7 +85,13 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe @Override public String toString() { - return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]"; + return "NullStorageLargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]"; + } + + public ServerMessage copy() + { + // This is a simple copy, used only to avoid changing original properties + return new NullStorageLargeServerMessage(this); } @Override diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/util/UnitTestCase.java b/activemq-server/src/test/java/org/apache/activemq/tests/util/UnitTestCase.java index 795a09edad..1bfdbf6d04 100644 --- a/activemq-server/src/test/java/org/apache/activemq/tests/util/UnitTestCase.java +++ b/activemq-server/src/test/java/org/apache/activemq/tests/util/UnitTestCase.java @@ -282,7 +282,7 @@ public abstract class UnitTestCase extends CoreUnitTestCase * @return * @throws Exception */ - protected final ConfigurationImpl createBasicConfig(final int serverID) + protected ConfigurationImpl createBasicConfig(final int serverID) { ConfigurationImpl configuration = new ConfigurationImpl() .setSecurityEnabled(false) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java index 827e106f82..90c9142aa4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/cluster/LargeMessageOverBridgeTest.java @@ -18,23 +18,62 @@ package org.apache.activemq.tests.integration.jms.cluster; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.util.Arrays; +import java.util.Collection; + import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.jms.ActiveMQJMSClient; import org.apache.activemq.api.jms.JMSFactoryType; import org.apache.activemq.core.config.ClusterConnectionConfiguration; import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.config.impl.ConfigurationImpl; import org.apache.activemq.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.tests.util.JMSClusteredTestBase; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(value = Parameterized.class) public class LargeMessageOverBridgeTest extends JMSClusteredTestBase { + + private final boolean persistent; + + @Override + protected boolean enablePersistence() + { + return persistent; + } + + @Parameterized.Parameters(name = "persistent={0}") + public static Collection getParameters() + { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + @Override + protected final ConfigurationImpl createBasicConfig(final int serverID) + { + ConfigurationImpl configuration = super.createBasicConfig(serverID); + configuration.setJournalFileSize(1024 * 1024); + return configuration; + } + + public LargeMessageOverBridgeTest(boolean persistent) + { + this.persistent = persistent; + } + /** * This was causing a text message to ber eventually converted into large message when sent over the bridge * @@ -78,6 +117,56 @@ public class LargeMessageOverBridgeTest extends JMSClusteredTestBase } + /** + * This was causing a text message to ber eventually converted into large message when sent over the bridge + * + * @throws Exception + */ + @Test + public void testSendMapMessageOverCluster() throws Exception + { + createQueue("Q1"); + + Queue queue = (Queue)context1.lookup("queue/Q1"); + Connection conn1 = cf1.createConnection(); + Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod1 = session1.createProducer(queue); + + Connection conn2 = cf2.createConnection(); + Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons2 = session2.createConsumer(queue); + conn2.start(); + + StringBuffer buffer = new StringBuffer(); + + for (int i = 0; i < 3810002; i++) + { + buffer.append('a'); + } + + final int NUMBER_OF_MESSAGES = 1; + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) + { + MapMessage msg = session1.createMapMessage(); + msg.setString("str", buffer.toString()); + msg.setIntProperty("i", i); + prod1.send(msg); + } + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) + { + MapMessage msg = (MapMessage)cons2.receive(5000); + assertEquals(buffer.toString(), msg.getString("str")); + } + + assertNull(cons2.receiveNoWait()); + + conn1.close(); + conn2.close(); + + } + protected Configuration createConfigServer2() { @@ -156,7 +245,7 @@ public class LargeMessageOverBridgeTest extends JMSClusteredTestBase for (int i = 0; i < 5; i++) { - BytesMessage msg2 = (BytesMessage) cons2.receive(10000); + BytesMessage msg2 = (BytesMessage) cons2.receive(5000); assertNotNull(msg2); msg2.acknowledge();