From 312d53d5abe7949f554a00bcf6e7f0dc2c44507e Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 10 Oct 2017 09:14:32 +0800 Subject: [PATCH] ARTEMIS-1227 Internal properties not removed from messages In a cluster if a node is shut down (or crashed) when a message is being routed to a remote binding, a internal property may be added to the message and persisted. The name of the property is like _AMQ_ROUTE_TOsf.my-cluster*. if the node starts back, it will load and reroute this message and if it goes to a local consumer, this property won't get removed and goes to the client. The fix is to remove this internal property before it is sent to any client. --- .../activemq/artemis/reader/MessageUtil.java | 3 +- .../jms/cluster/TopicClusterTest.java | 90 +++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 9d37cd3d96..2660f96991 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -147,7 +147,8 @@ public class MessageUtil { for (SimpleString propName : message.getPropertyNames()) { if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || - propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) { + propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && + !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { set.add(propName.toString()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterTest.java index 2b9dd6fcf0..00533da211 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterTest.java @@ -18,15 +18,22 @@ package org.apache.activemq.artemis.tests.integration.jms.cluster; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase; import org.junit.Test; +import java.util.Enumeration; +import java.util.Set; + public class TopicClusterTest extends JMSClusteredTestBase { // TODO: required to match cluster-connection @@ -95,6 +102,89 @@ public class TopicClusterTest extends JMSClusteredTestBase { } + @Test + public void testInternalPropertyNotExposed() throws Exception { + Connection conn1 = cf1.createConnection(); + + conn1.setClientID("someClient1"); + + Connection conn2 = cf2.createConnection(); + + conn2.setClientID("someClient2"); + + conn1.start(); + + conn2.start(); + + try { + + Topic topic1 = createTopic(TOPIC, true); + + Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod1 = session1.createProducer(topic1); + prod1.setDeliveryMode(DeliveryMode.PERSISTENT); + + MessageConsumer cons1 = session1.createDurableSubscriber(topic1, "sub1"); + MessageConsumer cons2 = session2.createDurableSubscriber(topic1, "sub2"); + + waitForBindings(server1, TOPIC, true, 1, 1, 2000); + waitForBindings(server2, TOPIC, true, 1, 1, 2000); + waitForBindings(server1, TOPIC, false, 1, 1, 2000); + waitForBindings(server2, TOPIC, false, 1, 1, 2000); + final int num = 1; + for (int i = 0; i < num; i++) { + prod1.send(session1.createTextMessage("someMessage" + i)); + } + + for (int i = 0; i < num; i++) { + TextMessage m2 = (TextMessage)cons2.receive(5000); + assertNotNull(m2); + TextMessage m1 = (TextMessage)cons1.receive(5000); + assertNotNull(m1); + checkInternalProperty(m1, m2); + } + + } finally { + conn1.close(); + conn2.close(); + + jmsServer1.destroyTopic(TOPIC); + jmsServer2.destroyTopic(TOPIC); + } + } + + //check that the internal property is in the core + //but didn't exposed to jms + private void checkInternalProperty(Message... msgs) throws Exception { + boolean checked = false; + for (Message m : msgs) { + ActiveMQMessage hqMessage = (ActiveMQMessage) m; + ClientMessage coreMessage = hqMessage.getCoreMessage(); + Set coreProps = coreMessage.getPropertyNames(); + System.out.println("core props: " + coreProps); + boolean exist = false; + for (SimpleString prop : coreProps) { + if (prop.startsWith(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS)) { + exist = true; + break; + } + } + + if (exist) { + Enumeration enumProps = m.getPropertyNames(); + while (enumProps.hasMoreElements()) { + String propName = (String) enumProps.nextElement(); + assertFalse("Shouldn't be in jms property: " + propName, propName.startsWith(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS.toString())); + } + checked = true; + } + } + assertTrue(checked); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------