ARTEMIS-1227 Internal properties not removed from messages
In a cluster if a node is shut down (or crashed) when a message is being routed to a remote binding, a internal property may be added to the message and persisted. The name of the property is like _AMQ_ROUTE_TOsf.my-cluster*. if the node starts back, it will load and reroute this message and if it goes to a local consumer, this property won't get removed and goes to the client. The fix is to remove this internal property before it is sent to any client.
This commit is contained in:
parent
964cbcad07
commit
312d53d5ab
|
@ -147,7 +147,8 @@ public class MessageUtil {
|
|||
|
||||
for (SimpleString propName : message.getPropertyNames()) {
|
||||
if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
|
||||
propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
|
||||
propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) &&
|
||||
!propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
|
||||
set.add(propName.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,15 +18,22 @@ package org.apache.activemq.artemis.tests.integration.jms.cluster;
|
|||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Enumeration;
|
||||
import java.util.Set;
|
||||
|
||||
public class TopicClusterTest extends JMSClusteredTestBase {
|
||||
|
||||
// TODO: required to match cluster-connection
|
||||
|
@ -95,6 +102,89 @@ public class TopicClusterTest extends JMSClusteredTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalPropertyNotExposed() throws Exception {
|
||||
Connection conn1 = cf1.createConnection();
|
||||
|
||||
conn1.setClientID("someClient1");
|
||||
|
||||
Connection conn2 = cf2.createConnection();
|
||||
|
||||
conn2.setClientID("someClient2");
|
||||
|
||||
conn1.start();
|
||||
|
||||
conn2.start();
|
||||
|
||||
try {
|
||||
|
||||
Topic topic1 = createTopic(TOPIC, true);
|
||||
|
||||
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer prod1 = session1.createProducer(topic1);
|
||||
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
||||
MessageConsumer cons1 = session1.createDurableSubscriber(topic1, "sub1");
|
||||
MessageConsumer cons2 = session2.createDurableSubscriber(topic1, "sub2");
|
||||
|
||||
waitForBindings(server1, TOPIC, true, 1, 1, 2000);
|
||||
waitForBindings(server2, TOPIC, true, 1, 1, 2000);
|
||||
waitForBindings(server1, TOPIC, false, 1, 1, 2000);
|
||||
waitForBindings(server2, TOPIC, false, 1, 1, 2000);
|
||||
final int num = 1;
|
||||
for (int i = 0; i < num; i++) {
|
||||
prod1.send(session1.createTextMessage("someMessage" + i));
|
||||
}
|
||||
|
||||
for (int i = 0; i < num; i++) {
|
||||
TextMessage m2 = (TextMessage)cons2.receive(5000);
|
||||
assertNotNull(m2);
|
||||
TextMessage m1 = (TextMessage)cons1.receive(5000);
|
||||
assertNotNull(m1);
|
||||
checkInternalProperty(m1, m2);
|
||||
}
|
||||
|
||||
} finally {
|
||||
conn1.close();
|
||||
conn2.close();
|
||||
|
||||
jmsServer1.destroyTopic(TOPIC);
|
||||
jmsServer2.destroyTopic(TOPIC);
|
||||
}
|
||||
}
|
||||
|
||||
//check that the internal property is in the core
|
||||
//but didn't exposed to jms
|
||||
private void checkInternalProperty(Message... msgs) throws Exception {
|
||||
boolean checked = false;
|
||||
for (Message m : msgs) {
|
||||
ActiveMQMessage hqMessage = (ActiveMQMessage) m;
|
||||
ClientMessage coreMessage = hqMessage.getCoreMessage();
|
||||
Set<SimpleString> coreProps = coreMessage.getPropertyNames();
|
||||
System.out.println("core props: " + coreProps);
|
||||
boolean exist = false;
|
||||
for (SimpleString prop : coreProps) {
|
||||
if (prop.startsWith(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS)) {
|
||||
exist = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (exist) {
|
||||
Enumeration enumProps = m.getPropertyNames();
|
||||
while (enumProps.hasMoreElements()) {
|
||||
String propName = (String) enumProps.nextElement();
|
||||
assertFalse("Shouldn't be in jms property: " + propName, propName.startsWith(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS.toString()));
|
||||
}
|
||||
checked = true;
|
||||
}
|
||||
}
|
||||
assertTrue(checked);
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
||||
// Protected -----------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue