From f465996444fdb6fcf33e1ce2262d6ea2aae0cdb1 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 12 Jun 2017 20:30:41 +0800 Subject: [PATCH] ARTEMIS-1227 Internal properties not removed from messages In a cluster if a node is shut down (or crashed) when a message is being routed to a remote binding, a internal property may be added to the message and persisted. The name of the property is like _AMQ_ROUTE_TOsf.my-cluster*. if the node starts back, it will load and reroute this message and if it goes to a local consumer, this property won't get removed and goes to the client. The fix is to remove this internal property before it is sent to any client. --- .../core/postoffice/impl/PostOfficeImpl.java | 2 +- .../extras/byteman/MessageRerouteTest.java | 136 ++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageRerouteTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 4627325d4e..b9a9cfaf4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -821,7 +821,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception { - setPagingStore(message); MessageReference reference = MessageReference.Factory.createReference(message, queue); @@ -830,6 +829,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding if (scheduledDeliveryTime != null) { reference.setScheduledDeliveryTime(scheduledDeliveryTime); } + message.cleanupInternalProperties(); message.incrementDurableRefCount(); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageRerouteTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageRerouteTest.java new file mode 100644 index 0000000000..39c03164db --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/MessageRerouteTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS; + +@RunWith(BMUnitRunner.class) +public class MessageRerouteTest extends ActiveMQTestBase { + + public static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); + private static boolean poisonMessage = false; + private static final String INTERNAL_PROP = HDR_ROUTE_TO_IDS + "sf.my-clusterxxxxxx"; + + protected ActiveMQServer server = null; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(createDefaultNettyConfig()); + server.getConfiguration().setJMXManagementEnabled(true); + + server.start(); + } + + @Override + @After + public void tearDown() throws Exception { + if (server != null) { + server.stop(); + } + super.tearDown(); + } + + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "insert a internal property to message", + targetClass = "org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl", + targetMethod = "processRoute(org.apache.activemq.artemis.api.core.Message, org.apache.activemq.artemis.core.server.RoutingContext, boolean)", + targetLocation = "AT ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.MessageRerouteTest.insertProperty($1);" + + ) + } + ) + // this test inserts a fake prop to a message before PostOffice.processRoute() + // then stop the server and restart again + // attach a consumer to receive the message, make sure the fake prop is gone. + public void testInternalPropertyRemoved() throws Exception { + ServerLocator locator = createNettyNonHALocator(); + locator.setBlockOnNonDurableSend(true); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(true); + + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession session = sf.createSession("guest", null, false, true, true, false, 0); + + session.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientMessage message = session.createMessage(true); + message.getBodyBuffer().writeBytes(new byte[24]); + + poisonMessage = true; + final int num = 20; + for (int i = 0; i < num; i++) { + producer.send(message); + } + + poisonMessage = false; + session.close(); + server.stop(); + + server.start(); + waitForServerToStart(server); + locator = createNettyNonHALocator(); + + sf = createSessionFactory(locator); + session = sf.createSession("guest", null, false, true, true, false, 0); + session.start(); + ClientConsumer consumer = session.createConsumer(ADDRESS); + for (int i = 0; i < num; i++) { + ClientMessage message1 = consumer.receive(2000); + System.out.println("message received: " + message1.getPropertyNames()); + assertFalse(message1.getPropertyNames().contains(new SimpleString(INTERNAL_PROP))); + } + + } + + public static void insertProperty(Message serverMessage) { + if (poisonMessage) { + byte[] ids = new byte[] {1, 2, 3, 4, 5, 6, 7, 8}; + serverMessage.putBytesProperty(INTERNAL_PROP, ids); + } + } + +}