This commit is contained in:
Clebert Suconic 2020-03-05 21:38:52 -05:00
commit 8927d07fb7
2 changed files with 42 additions and 1 deletions

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@ -943,7 +944,7 @@ public final class OpenWireMessageConverter {
final AMQConsumer consumer) throws IOException { final AMQConsumer consumer) throws IOException {
for (SimpleString s : props) { for (SimpleString s : props) {
final String keyStr = s.toString(); final String keyStr = s.toString();
if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
continue; continue;
} }
final Object prop = coreMessage.getObjectProperty(s); final Object prop = coreMessage.getObjectProperty(s);

View File

@ -23,9 +23,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -51,6 +55,7 @@ import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -58,6 +63,41 @@ public class DivertTest extends ActiveMQTestBase {
private static final int TIMEOUT = 3000; private static final int TIMEOUT = 3000;
@Test
public void testDivertedNotificationMessagePropertiesOpenWire() throws Exception {
final String testAddress = ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress().toString();
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setFilterString("_AMQ_NotifType = 'CONSUMER_CREATED' OR _AMQ_NotifType = 'CONSUMER_CLOSED'");
Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory.setClientID("myClientID");
Topic forwardTopic = new ActiveMQTopic(forwardAddress);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(forwardTopic, "mySubscriptionName");
javax.jms.Message message = subscriber.receive(DivertTest.TIMEOUT);
connection.close();
Assert.assertNotNull(message);
Assert.assertEquals("CONSUMER_CREATED", message.getStringProperty("_AMQ_NotifType"));
}
@Test @Test
public void testSingleNonExclusiveDivert() throws Exception { public void testSingleNonExclusiveDivert() throws Exception {
final String testAddress = "testAddress"; final String testAddress = "testAddress";