From 328229b42be7bb7592f5b6631454c7ca7ce03050 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 22 Oct 2007 18:39:57 +0000 Subject: [PATCH] Fixed the test cases in FanoutTransportBrokerTest also added a fanOutQueues property to enabled fanning out on Queues. See issue: https://issues.apache.org/activemq/browse/AMQ-1464 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@587188 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/fanout/FanoutTransport.java | 19 ++++++++++++++++++- .../activemq/network/NetworkTestSupport.java | 2 +- .../fanout/FanoutTransportBrokerTest.java | 17 ++++++++--------- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 7c75eebd10..edff4d13aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -76,6 +76,7 @@ public class FanoutTransport implements CompositeTransport { private int maxReconnectAttempts; private Exception connectionFailure; private FanoutTransportHandler primary; + private boolean fanOutQueues; static class RequestCounter { @@ -210,13 +211,18 @@ public class FanoutTransport implements CompositeTransport { primary = fanoutHandler; } t.setTransportListener(fanoutHandler); - connectedCount++; if (started) { restoreTransport(fanoutHandler); } + connectedCount++; } catch (Exception e) { LOG.debug("Connect fail to: " + uri + ", reason: " + e); + if( fanoutHandler.transport !=null ) { + ServiceSupport.dispose(fanoutHandler.transport); + fanoutHandler.transport=null; + } + if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); connectionFailure = e; @@ -418,6 +424,9 @@ public class FanoutTransport implements CompositeTransport { */ private boolean isFanoutCommand(Command command) { if (command.isMessage()) { + if( fanOutQueues ) { + return true; + } return ((Message)command).getDestination().isTopic(); } if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) { @@ -552,4 +561,12 @@ public class FanoutTransport implements CompositeTransport { return true; } + public boolean isFanOutQueues() { + return fanOutQueues; + } + + public void setFanOutQueues(boolean fanOutQueues) { + this.fanOutQueues = fanOutQueues; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java index 601a447dc3..28dd1ca8c2 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java @@ -151,7 +151,7 @@ public class NetworkTestSupport extends BrokerTestSupport { remoteBroker = createRemoteBroker(remotePersistenceAdapter); remoteBroker.start(); String brokerId = remoteBroker.getBrokerName(); - remoteConnector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI()))); + remoteConnector = new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI()))); remoteConnector.start(); BrokerRegistry.getInstance().bind("remotehost", remoteBroker); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java index 9e0e990f61..05b8ce7954 100755 --- a/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java @@ -47,8 +47,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { public ActiveMQDestination destination; public int deliveryMode; - private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; - public static Test suite() { return suite(FanoutTransportBrokerTest.class); } @@ -59,10 +57,10 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { public void initCombosForTestPublisherFansout() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")}); + addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")}); } - public void xtestPublisherFansout() throws Exception { + public void testPublisherFansout() throws Exception { // Start a normal consumer on the local broker StubConnection connection1 = createConnection(); @@ -105,7 +103,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { public void initCombosForTestPublisherWaitsForServerToBeUp() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")}); + addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")}); } public void testPublisherWaitsForServerToBeUp() throws Exception { @@ -177,20 +175,21 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport { // Restart the remote server. State should be re-played and the publish // should continue. - remoteURI = remoteConnector.getServer().getConnectURI().toString(); + LOG.info("Restarting Broker"); restartRemoteBroker(); + LOG.info("Broker Restarted"); // This should reconnect, and resend - assertTrue(publishDone.await(10, TimeUnit.SECONDS)); + assertTrue(publishDone.await(20, TimeUnit.SECONDS)); } protected String getLocalURI() { - return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; + return "tcp://localhost:61616"; } protected String getRemoteURI() { - return remoteURI; + return "tcp://localhost:61617"; } protected StubConnection createFanoutConnection() throws Exception {