added patch for https://issues.apache.org/activemq/browse/AMQ-1435 to handle Mirrored Queues (so that folks can consume on a topic what messages are sent to a queue)

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@579941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-09-27 08:26:57 +00:00
parent 8cad176f68
commit 42b768aa1f
8 changed files with 320 additions and 34 deletions

View File

@ -39,22 +39,15 @@ import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.FTConnectorView;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
@ -148,6 +141,7 @@ public class BrokerService implements Service {
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive = true;
private boolean useVirtualTopics = true;
private boolean useMirroredQueues = false;
private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations;
@ -1044,6 +1038,20 @@ public class BrokerService implements Service {
return destinationInterceptors;
}
public boolean isUseMirroredQueues() {
return useMirroredQueues;
}
/**
* Sets whether or not <a
* href="http://activemq.apache.org/mirrored-queues.html">Mirrored
* Queues</a> should be supported by default if they have not been
* explicitly configured.
*/
public void setUseMirroredQueues(boolean useMirroredQueues) {
this.useMirroredQueues = useMirroredQueues;
}
/**
* Sets the destination interceptors to use
*/
@ -1160,6 +1168,13 @@ public class BrokerService implements Service {
this.clustered = clustered;
}
/**
* Looks up and lazily creates if necessary the destination for the given JMS name
*/
public Destination getDestination(ActiveMQDestination destination) throws Exception {
return getBroker().addDestination(getAdminConnectionContext(), destination);
}
// Implementation methods
// -------------------------------------------------------------------------
/**
@ -1399,12 +1414,12 @@ public class BrokerService implements Service {
}
getPersistenceAdapter().start();
DestinationInterceptor destinationInterceptor = null;
if (destinationInterceptors != null) {
destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
} else {
destinationInterceptor = createDefaultDestinationInterceptor();
if (destinationInterceptors == null) {
destinationInterceptors = createDefaultDestinationInterceptor();
}
configureServices(destinationInterceptors);
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
RegionBroker regionBroker = null;
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
@ -1426,16 +1441,23 @@ public class BrokerService implements Service {
/**
* Create the default destination interceptor
*/
protected DestinationInterceptor createDefaultDestinationInterceptor() {
if (!isUseVirtualTopics()) {
return null;
protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
if (isUseVirtualTopics()) {
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualTopic.>");
VirtualDestination[] virtualDestinations = {virtualTopic};
interceptor.setVirtualDestinations(virtualDestinations);
answer.add(interceptor);
}
VirtualDestinationInterceptor answer = new VirtualDestinationInterceptor();
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualTopic.>");
VirtualDestination[] virtualDestinations = {virtualTopic};
answer.setVirtualDestinations(virtualDestinations);
return answer;
if (isUseMirroredQueues()) {
MirroredQueue interceptor = new MirroredQueue();
answer.add(interceptor);
}
DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
answer.toArray(array);
return array;
}
/**
@ -1667,6 +1689,15 @@ public class BrokerService implements Service {
return connector;
}
/**
* Perform any custom dependency injection
*/
protected void configureServices(Object[] services) {
for (Object service : services) {
configureService(service);
}
}
/**
* Perform any custom dependency injection
*/

View File

@ -32,7 +32,7 @@ public abstract class CompositeDestination implements VirtualDestination {
private boolean copyMessage = true;
public Destination intercept(Destination destination) {
return new CompositeDestinationInterceptor(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
}
public String getName() {

View File

@ -32,13 +32,13 @@ import org.apache.activemq.filter.MessageEvaluationContext;
*
* @version $Revision$
*/
public class CompositeDestinationInterceptor extends DestinationFilter {
public class CompositeDestinationFilter extends DestinationFilter {
private Collection forwardDestinations;
private boolean forwardOnly;
private boolean copyMessage;
public CompositeDestinationInterceptor(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
super(next);
this.forwardDestinations = forwardDestinations;
this.forwardOnly = forwardOnly;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Creates <a href="http://activemq.org/site/mirrored-queue.html">Mirrored
* Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to.
*
* @version $Revision$
* @org.apache.xbean.XBean
*/
public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware {
private static final transient Log LOG = LogFactory.getLog(MirroredQueue.class);
private String prefix = "VirtualTopic.Mirror.";
private String postfix = "";
private boolean copyMessage = true;
private BrokerService brokerService;
public Destination intercept(final Destination destination) {
if (destination.getActiveMQDestination().isQueue()) {
try {
final Destination mirrorDestination = getMirrorDestination(destination);
if (mirrorDestination != null) {
return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
message.setDestination(mirrorDestination.getActiveMQDestination());
mirrorDestination.send(context, message);
if (isCopyMessage()) {
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
super.send(context, message);
}
};
}
}
catch (Exception e) {
LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
}
}
return destination;
}
// Properties
// -------------------------------------------------------------------------
public String getPostfix() {
return postfix;
}
/**
* Sets any postix used to identify the queue consumers
*/
public void setPostfix(String postfix) {
this.postfix = postfix;
}
public String getPrefix() {
return prefix;
}
/**
* Sets the prefix wildcard used to identify the queue consumers for a given
* topic
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public boolean isCopyMessage() {
return copyMessage;
}
/**
* Sets whether a copy of the message will be sent to each destination.
* Defaults to true so that the forward destination is set as the
* destination of the message
*/
public void setCopyMessage(boolean copyMessage) {
this.copyMessage = copyMessage;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
// Implementation methods
//-------------------------------------------------------------------------
protected Destination getMirrorDestination(Destination destination) throws Exception {
if (brokerService == null) {
throw new IllegalArgumentException("No brokerService injected!");
}
ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
return brokerService.getDestination(topic);
}
protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
}
}

View File

@ -80,11 +80,4 @@ public class VirtualTopic implements VirtualDestination {
this.name = name;
}
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
}
}

View File

@ -43,6 +43,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
super.send(context, message);
}
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: $
*/
public class MirroredQueueTest extends EmbeddedBrokerTestSupport {
private static final transient Log LOG = LogFactory.getLog(MirroredQueueTest.class);
private Connection connection;
public void testSendingToQueueIsMirrored() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
Destination consumeDestination = createConsumeDestination();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOG.info("Consuming from: " + consumeDestination);
MessageConsumer c1 = session.createConsumer(consumeDestination);
c1.setMessageListener(messageList);
// create topic producer
ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
LOG.info("Sending to: " + sendDestination);
MessageProducer producer = session.createProducer(sendDestination);
assertNotNull(producer);
int total = 10;
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
}
///Thread.sleep(1000000);
messageList.assertMessagesArrived(total);
LOG.info("Received: " + messageList);
System.out.println("Received: " + messageList.flushMessages());
}
protected Destination createConsumeDestination() {
return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
}
protected String getQueueName() {
return "My.Queue";
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setUseMirroredQueues(true);
answer.setPersistent(isPersistent());
answer.addConnector(bindAddress);
return answer;
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
/**
*
* @version $Revision: $
*/
public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest {
@Override
protected Destination createConsumeDestination() {
String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName();
return new ActiveMQQueue(queueName);
}
}