diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml
index 4de37c559f..9f9511a14a 100755
--- a/activemq-core/pom.xml
+++ b/activemq-core/pom.xml
@@ -449,9 +449,6 @@
**/load/*
-
- **/FanoutTransportBrokerTest.*
-
**/MultipleTestsWithSpringFactoryBeanTest.*
**/MultipleTestsWithXBeanFactoryBeanTest.*
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index a736daf921..f0efb71f3a 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -23,9 +23,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.thread.DefaultThreadPools;
@@ -130,7 +132,7 @@ public class FanoutTransport implements CompositeTransport {
public void onException(IOException error) {
try {
synchronized (reconnectMutex) {
- if (transport == null) {
+ if (transport == null || !transport.isConnected()) {
return;
}
@@ -434,7 +436,8 @@ public class FanoutTransport implements CompositeTransport {
}
return ((Message)command).getDestination().isTopic();
}
- if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
+ if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
+ command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
return false;
}
return true;
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
new file mode 100644
index 0000000000..62104d49de
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.fanout;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.MessageIdList;
+
+public class FanoutTest extends TestCase {
+
+ BrokerService broker1;
+ BrokerService broker2;
+
+ ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true");
+ Connection producerConnection;
+ Session producerSession;
+ int messageCount = 100;
+
+ public void setUp() throws Exception {
+ broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false");
+ broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false");
+
+ broker1.start();
+ broker2.start();
+
+ broker1.waitUntilStarted();
+ broker2.waitUntilStarted();
+
+ producerConnection = producerFactory.createConnection();
+ producerConnection.start();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void tearDown() throws Exception {
+ producerSession.close();
+ producerConnection.close();
+
+ broker1.stop();
+ broker2.stop();
+ }
+
+ public void testSendReceive() throws Exception {
+
+ MessageProducer prod = createProducer();
+ for (int i = 0; i < messageCount; i++) {
+ Message msg = producerSession.createTextMessage("Message " + i);
+ prod.send(msg);
+ }
+ prod.close();
+
+ assertMessagesReceived("tcp://localhost:61616");
+ assertMessagesReceived("tcp://localhost:61617");
+
+ }
+
+ protected MessageProducer createProducer() throws Exception {
+ return producerSession.createProducer(producerSession.createQueue("TEST"));
+ }
+
+ protected void assertMessagesReceived(String brokerURL) throws Exception {
+ ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory(brokerURL);
+ Connection consumerConnection = consumerFactory.createConnection();
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("TEST"));
+ MessageIdList listener = new MessageIdList();
+ consumer.setMessageListener(listener);
+ listener.waitForMessagesToArrive(messageCount);
+ listener.assertMessagesReceived(messageCount);
+
+ consumer.close(); consumerConnection.close(); consumerSession.close();
+ }
+}