Fix for https://issues.apache.org/activemq/browse/AMQ-2086 - duplex connector and excluded destinations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@763993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-04-10 17:58:32 +00:00
parent 3da13732ec
commit d185430ffe
7 changed files with 250 additions and 58 deletions

View File

@ -433,6 +433,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
remoteBrokerInfo = (BrokerInfo)command;
Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
try {
IntrospectionSupport.getProperties(configuration, props, null);
excludedDestinations = configuration.getExcludedDestinations().toArray(new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
} catch (Throwable t) {
LOG.error("Error mapping remote destinations", t);
}
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.network;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Configuration for a NetworkBridge
*
@ -36,8 +40,14 @@ public class NetworkBridgeConfiguration {
private String password;
private String destinationFilter = ">";
private String name = null;
private List<ActiveMQDestination> excludedDestinations;
private List<ActiveMQDestination> dynamicallyIncludedDestinations;
private List<ActiveMQDestination> staticallyIncludedDestinations;
private boolean suppressDuplicateQueueSubscriptions = false;
/**
* @return the conduitSubscriptions
*/
@ -224,6 +234,35 @@ public class NetworkBridgeConfiguration {
this.name = name;
}
public List<ActiveMQDestination> getExcludedDestinations() {
return excludedDestinations;
}
public void setExcludedDestinations(
List<ActiveMQDestination> excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
public void setDynamicallyIncludedDestinations(
List<ActiveMQDestination> dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
public void setStaticallyIncludedDestinations(
List<ActiveMQDestination> staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions;
}

View File

@ -21,8 +21,6 @@ import java.beans.PropertyEditorManager;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@ -32,8 +30,23 @@ import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.lang.ArrayUtils;
public final class IntrospectionSupport {
static {
// find Spring and ActiveMQ specific property editors
String[] searchPath = (String[])ArrayUtils.addAll(
PropertyEditorManager.getEditorSearchPath(),
new String[] {
"org.springframework.beans.propertyeditors"
, "org.apache.activemq.util"
}
);
PropertyEditorManager.setEditorSearchPath(searchPath);
}
private IntrospectionSupport() {
}
@ -177,27 +190,21 @@ public final class IntrospectionSupport {
}
}
private static Object convert(Object value, Class type) throws URISyntaxException {
private static Object convert(Object value, Class type) {
PropertyEditor editor = PropertyEditorManager.findEditor(type);
if (editor != null) {
editor.setAsText(value.toString());
return editor.getValue();
}
if (type == URI.class) {
return new URI(value.toString());
}
return null;
}
private static String convertToString(Object value, Class type) throws URISyntaxException {
public static String convertToString(Object value, Class type) {
PropertyEditor editor = PropertyEditorManager.findEditor(type);
if (editor != null) {
editor.setValue(value);
return editor.getAsText();
}
if (type == URI.class) {
return ((URI)value).toString();
}
return null;
}
@ -219,12 +226,7 @@ public final class IntrospectionSupport {
if (PropertyEditorManager.findEditor(clazz) != null) {
return true;
}
if (clazz == URI.class) {
return true;
}
if (clazz == Boolean.class) {
return true;
}
return false;
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.beans.PropertyEditorSupport;
import java.util.ArrayList;
import org.apache.activemq.command.ActiveMQDestination;
import org.springframework.util.StringUtils;
/**
* Used to serialize lists of ActiveMQDestinations.
* @see org.apache.activemq.util.IntrospectionSupport
*/
public class ListEditor extends PropertyEditorSupport {
public static final String DEFAULT_SEPARATOR = ",";
public String getAsText() {
return getValue().toString();
}
public void setAsText(String text) throws IllegalArgumentException {
text = text.substring(1, text.length() - 1);
String[] array = StringUtils.delimitedListToStringArray(text, ListEditor.DEFAULT_SEPARATOR, null);
ArrayList<ActiveMQDestination> list = new ArrayList<ActiveMQDestination>();
for (String item : array) {
list.add(ActiveMQDestination.createDestination(item.trim(), ActiveMQDestination.QUEUE_TYPE));
}
setValue(list);
}
}

View File

@ -36,65 +36,92 @@ import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
public void testBrokerConnectionDuplexPropertiesPropagation()
throws Exception {
BrokerService receiverBroker;
BrokerService senderBroker;
Connection hubConnection;
Session hubSession;
Connection spokeConnection;
Session spokeSession;
public void setUp() throws Exception {
// Hub broker
String configFileName = "org/apache/activemq/usecases/receiver-duplex.xml";
URI uri = new URI("xbean:" + configFileName);
BrokerService receiverBroker = BrokerFactory.createBroker(uri);
receiverBroker = BrokerFactory.createBroker(uri);
receiverBroker.setPersistent(false);
receiverBroker.setBrokerName("Hub");
// Spoke broker
configFileName = "org/apache/activemq/usecases/sender-duplex.xml";
uri = new URI("xbean:" + configFileName);
BrokerService senderBroker = BrokerFactory.createBroker(uri);
senderBroker = BrokerFactory.createBroker(uri);
senderBroker.setPersistent(false);
receiverBroker.setBrokerName("Spoke");
senderBroker.setBrokerName("Spoke");
// Start both Hub and Spoke broker
receiverBroker.start();
senderBroker.start();
// create hub session
ConnectionFactory cfHub = new ActiveMQConnectionFactory("tcp://localhost:62002");
final ConnectionFactory cfHub = new ActiveMQConnectionFactory(
"tcp://localhost:62002");
final Connection hubConnection = cfHub.createConnection();
hubConnection = cfHub.createConnection();
hubConnection.start();
final Session hubSession = hubConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
final MessageProducer hubProducer = hubSession.createProducer(null);
hubSession = hubConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create spoke session
ConnectionFactory cfSpoke = new ActiveMQConnectionFactory("tcp://localhost:62001");
spokeConnection = cfSpoke.createConnection();
spokeConnection.start();
spokeSession = spokeConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void tearDown() throws Exception {
hubSession.close();
hubConnection.stop();
hubConnection.close();
spokeSession.close();
spokeConnection.stop();
spokeConnection.close();
senderBroker.stop();
receiverBroker.stop();
}
public void testDuplexSendFromHubToSpoke()
throws Exception {
//create hub producer
MessageProducer hubProducer = hubSession.createProducer(null);
hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
hubProducer.setDisableMessageID(true);
hubProducer.setDisableMessageTimestamp(true);
final Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
final TextMessage excludedMsgHub = hubSession.createTextMessage();
Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
TextMessage excludedMsgHub = hubSession.createTextMessage();
excludedMsgHub.setText(excludedQueueHub.toString());
final Queue includedQueueHub = hubSession.createQueue("include.test.foo");
Queue includedQueueHub = hubSession.createQueue("include.test.foo");
final TextMessage includedMsgHub = hubSession.createTextMessage();
excludedMsgHub.setText(includedQueueHub.toString());
TextMessage includedMsgHub = hubSession.createTextMessage();
includedMsgHub.setText(includedQueueHub.toString());
// Sending from Hub queue
hubProducer.send(excludedQueueHub, excludedMsgHub);
hubProducer.send(includedQueueHub, includedMsgHub);
final ConnectionFactory cfSpoke = new ActiveMQConnectionFactory(
"tcp://localhost:62001");
final Connection spokeConnection = cfSpoke.createConnection();
spokeConnection.start();
final Session spokeSession = spokeConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
final Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
final MessageConsumer excludedConsumerSpoke = spokeSession
.createConsumer(excludedQueueSpoke);
final Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
final MessageConsumer includedConsumerSpoke = spokeSession
.createConsumer(includedQueueSpoke);
Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
MessageConsumer excludedConsumerSpoke = spokeSession.createConsumer(excludedQueueSpoke);
Thread.sleep(100);
Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
MessageConsumer includedConsumerSpoke = spokeSession.createConsumer(includedQueueSpoke);
// Receiving from excluded Spoke queue
Message msg = excludedConsumerSpoke.receive(200);
@ -102,19 +129,16 @@ public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
// Receiving from included Spoke queue
msg = includedConsumerSpoke.receive(200);
assertEquals(msg, includedMsgHub);
assertEquals(includedMsgHub, msg);
// we should be able to receive excluded queue message on Hub
MessageConsumer excludedConsumerHub = hubSession.createConsumer(excludedQueueHub);
msg = excludedConsumerHub.receive(200);;
assertEquals(excludedMsgHub, msg);
excludedConsumerSpoke.close();
hubSession.close();
hubConnection.stop();
hubConnection.close();
hubProducer.close();
spokeSession.close();
spokeConnection.stop();
spokeConnection.close();
senderBroker.stop();
receiverBroker.stop();
excludedConsumerSpoke.close();
}
}

View File

@ -18,11 +18,29 @@ package org.apache.activemq.util;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
public class ReflectionSupportTest extends TestCase {
List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
String favoritesString = "[queue://test, topic://test]";
List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
String nonFavoritesString = "[topic://test1]";
public void setUp() {
favorites.add(new ActiveMQQueue("test"));
favorites.add(new ActiveMQTopic("test"));
nonFavorites.add(new ActiveMQTopic("test1"));
}
public void testSetProperties() throws URISyntaxException {
SimplePojo pojo = new SimplePojo();
@ -30,7 +48,10 @@ public class ReflectionSupportTest extends TestCase {
map.put("age", "27");
map.put("name", "Hiram");
map.put("enabled", "true");
map.put("uri", "test://value");
map.put("uri", "test://value");
map.put("favorites", favoritesString);
map.put("nonFavorites", nonFavoritesString);
map.put("others", null);
IntrospectionSupport.setProperties(pojo, map);
@ -38,5 +59,29 @@ public class ReflectionSupportTest extends TestCase {
assertEquals("Hiram", pojo.getName());
assertEquals(true, pojo.isEnabled());
assertEquals(new URI("test://value"), pojo.getUri());
assertEquals(favorites, pojo.getFavorites());
assertEquals(nonFavorites, pojo.getNonFavorites());
assertNull(pojo.getOthers());
}
public void testGetProperties() {
SimplePojo pojo = new SimplePojo();
pojo.setAge(31);
pojo.setName("Dejan");
pojo.setEnabled(true);
pojo.setFavorites(favorites);
pojo.setNonFavorites(nonFavorites);
pojo.setOthers(null);
Properties props = new Properties();
IntrospectionSupport.getProperties(pojo, props, null);
assertEquals("Dejan", props.get("name"));
assertEquals("31", props.get("age"));
assertEquals("True", props.get("enabled"));
assertEquals(favoritesString, props.get("favorites"));
assertEquals(nonFavoritesString, props.get("nonFavorites"));
assertNull(props.get("others"));
}
}

View File

@ -17,6 +17,10 @@
package org.apache.activemq.util;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
public class SimplePojo {
@ -24,6 +28,9 @@ public class SimplePojo {
int age;
boolean enabled;
URI uri;
List<ActiveMQDestination> favorites = new ArrayList<ActiveMQDestination>();
List<ActiveMQDestination> nonFavorites = new ArrayList<ActiveMQDestination>();
List<ActiveMQDestination> others = new ArrayList<ActiveMQDestination>();
public int getAge() {
return age;
@ -49,5 +56,23 @@ public class SimplePojo {
public void setUri(URI uri) {
this.uri = uri;
}
public List<ActiveMQDestination> getFavorites() {
return favorites;
}
public void setFavorites(List<ActiveMQDestination> favorites) {
this.favorites = favorites;
}
public List<ActiveMQDestination> getNonFavorites() {
return nonFavorites;
}
public void setNonFavorites(List<ActiveMQDestination> nonFavorites) {
this.nonFavorites = nonFavorites;
}
public List<ActiveMQDestination> getOthers() {
return others;
}
public void setOthers(List<ActiveMQDestination> others) {
this.others = others;
}
}