https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue

This commit is contained in:
Dejan Bosanac 2015-12-21 15:19:01 +01:00
parent 370b181099
commit 4e63ee7cc7
5 changed files with 309 additions and 3 deletions

View File

@ -91,7 +91,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
public void run() {
try {
if (exceptionAtomicReference.get() == null) {
dest.send(context, message.copy());
dest.send(context, copy(message, dest.getActiveMQDestination()));
}
} catch (Exception e) {
exceptionAtomicReference.set(e);
@ -112,7 +112,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
} else {
for (final Destination dest : destinations) {
if (shouldDispatch(broker, message, dest)) {
dest.send(context, message.copy());
dest.send(context, copy(message, dest.getActiveMQDestination()));
}
}
}
@ -121,6 +121,13 @@ public class VirtualTopicInterceptor extends DestinationFilter {
}
}
private Message copy(Message original, ActiveMQDestination target) {
Message msg = original.copy();
msg.setDestination(target);
msg.setOriginalDestination(original.getDestination());
return msg;
}
private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
LocalTransactionId result = null;
if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "virtual-topic-network-test.xml" })
public class MessageDestinationVirtualTopicTest {
private static final Logger LOG = LoggerFactory.getLogger(MessageDestinationVirtualTopicTest.class);
private SimpleMessageListener listener1;
private SimpleMessageListener listener2;
@Resource(name = "broker1")
private BrokerService broker1;
@Resource(name = "broker2")
private BrokerService broker2;
private MessageProducer producer;
private Session session1;
public void init() throws JMSException {
// Create connection on Broker B2
ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616");
Connection connection2 = broker2ConnectionFactory.createConnection();
connection2.start();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
// Bind listener on queue for consumer D
MessageConsumer consumer = session2.createConsumer(consumerDQueue);
listener2 = new SimpleMessageListener();
consumer.setMessageListener(listener2);
// Create connection on Broker B1
ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection1 = broker1ConnectionFactory.createConnection();
connection1.start();
session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
// Bind listener on queue for consumer D
MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
listener1 = new SimpleMessageListener();
consumer1.setMessageListener(listener1);
// Create producer for topic, on B1
Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1");
producer = session1.createProducer(virtualTopicT1);
}
@Test
public void testDestinationNames() throws Exception {
LOG.info("Started waiting for broker 1 and 2");
broker1.waitUntilStarted();
broker2.waitUntilStarted();
LOG.info("Broker 1 and 2 have started");
init();
// Create a monitor
CountDownLatch monitor = new CountDownLatch(2);
listener1.setCountDown(monitor);
listener2.setCountDown(monitor);
LOG.info("Sending message");
// Send a message on the topic
TextMessage message = session1.createTextMessage("Hello World !");
producer.send(message);
LOG.info("Waiting for message reception");
// Wait the two messages in the related queues
monitor.await();
// Get the message destinations
String lastJMSDestination2 = listener2.getLastJMSDestination();
System.err.println(lastJMSDestination2);
String lastJMSDestination1 = listener1.getLastJMSDestination();
System.err.println(lastJMSDestination1);
// The destination names
assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleMessageListener implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListener.class);
private CountDownLatch messageReceivedToken;
private String lastJMSDestination;
@Override
public void onMessage(Message message) {
try {
Thread.sleep(2000L);
if (message instanceof TextMessage) {
LOG.info("Dest:" + message.getJMSDestination());
lastJMSDestination = message.getJMSDestination().toString();
Enumeration propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
Object object = propertyNames.nextElement();
}
}
messageReceivedToken.countDown();
}
catch (JMSException e) {
LOG.error("Error while listening to a message", message);
}
catch (InterruptedException e) {
LOG.error("Interrupted while listening to a message", message);
}
}
/**
* @param countDown
* the countDown to set
*/
public void setCountDown(CountDownLatch countDown) {
this.messageReceivedToken = countDown;
}
/**
* @return the lastJMSDestination
*/
public String getLastJMSDestination() {
return lastJMSDestination;
}
}

View File

@ -69,7 +69,7 @@ public class VirtualTopicDLQTest extends TestCase {
// Expected Individual Dead Letter Queue names that are tied to the
// Subscriber Queues
private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
private static final String dlqPrefix = "ActiveMQ.DLQ.Queue.";
// Number of messages
private static final int numberMessages = 6;

View File

@ -0,0 +1,100 @@
<!-- START SNIPPET: xbean -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.11.0.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<!-- Broker 1 definition -->
<amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker1" brokerName="B1" useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true" startAsync="true">
<!-- Transport protocol -->
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
<!-- Network of brokers setup -->
<!--amq:networkConnectors>
<amq:networkConnector name="linkToBrokerB2" uri="static:(tcp://localhost:62616)" networkTTL="1" duplex="false"/>
</amq:networkConnectors-->
<amq:destinationInterceptors>
<amq:virtualDestinationInterceptor>
<amq:virtualDestinations>
<!-- Virtual topic policies -->
<!-- they should be local to avoid message duplicate -->
<amq:virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
</amq:virtualDestinations>
</amq:virtualDestinationInterceptor>
</amq:destinationInterceptors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="4 mb">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<amq:destinations>
<!-- topics -->
<amq:topic physicalName="VirtualTopic.T1" />
</amq:destinations>
</amq:broker>
<!-- Broker 2 definition -->
<amq:broker xmlns="http://activemq.apache.org/schema/core" id="broker2" brokerName="B2" useJmx="false" useShutdownHook="false" useVirtualTopics="true" persistent="false" start="true" startAsync="true">
<!-- Transport protocol -->
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:62616" />
</amq:transportConnectors>
<!-- Network of brokers setup -->
<amq:networkConnectors>
<amq:networkConnector name="linkToBrokerB1" uri="static:(tcp://localhost:61616)" networkTTL="1" duplex="true" />
</amq:networkConnectors>
<amq:destinationInterceptors>
<amq:virtualDestinationInterceptor>
<amq:virtualDestinations>
<!-- Virtual topic policies -->
<!-- they should be local to avoid message duplicate -->
<amq:virtualTopic name=">" prefix="Consumer.*."/>
</amq:virtualDestinations>
</amq:virtualDestinationInterceptor>
</amq:destinationInterceptors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="4 mb">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<amq:destinations>
<!-- topics -->
<amq:topic physicalName="VirtualTopic.T1" />
</amq:destinations>
</amq:broker>
</beans>
<!-- END SNIPPET: xbean -->