From 7d8ce02dee5b71ac7a90bd82f4a39bf17e1842cf Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Thu, 29 Sep 2011 21:42:29 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3517 Adds tests for the backup option and cleans up some other the other tests. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177437 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 114 +++++---- .../failover/FailoverClusterTest.java | 4 +- ...FailoverConsumerOutstandingCommitTest.java | 5 +- .../FailoverConsumerUnconsumedTest.java | 1 + .../failover/FailoverPrefetchZeroTest.java | 1 + .../failover/FailoverTransactionTest.java | 25 +- .../FailoverTransportBackupsTest.java | 222 ++++++++++++++++++ .../failover/FailoverTransportBrokerTest.java | 3 +- 8 files changed, 306 insertions(+), 69 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 705461794d..dceba66706 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -431,6 +431,10 @@ public class FailoverTransport implements CompositeTransport { this.backupPoolSize = backupPoolSize; } + public int getCurrentBackups() { + return this.backups.size(); + } + public boolean isTrackMessages() { return trackMessages; } @@ -470,11 +474,11 @@ public class FailoverTransport implements CompositeTransport { } else if (command instanceof RemoveInfo || command.isMessageAck()) { // Simulate response to RemoveInfo command or MessageAck (as it will be stale) stateTracker.track(command); - if (command.isResponseRequired()) { - Response response = new Response(); - response.setCorrelationId(command.getCommandId()); - myTransportListener.onCommand(response); - } + if (command.isResponseRequired()) { + Response response = new Response(); + response.setCorrelationId(command.getCommandId()); + myTransportListener.onCommand(response); + } return; } } @@ -489,18 +493,24 @@ public class FailoverTransport implements CompositeTransport { boolean timedout = false; while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) { - LOG.trace("Waiting for transport to reconnect..: " + command); + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for transport to reconnect..: " + command); + } long end = System.currentTimeMillis(); if (timeout > 0 && (end - start > timeout)) { timedout = true; - LOG.info("Failover timed out after " + (end - start) + "ms"); + if (LOG.isInfoEnabled()) { + LOG.info("Failover timed out after " + (end - start) + "ms"); + } break; } try { reconnectMutex.wait(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.debug("Interupted: " + e, e); + if (LOG.isDebugEnabled()) { + LOG.debug("Interupted: " + e, e); + } } transport = connectedTransport.get(); } @@ -572,6 +582,7 @@ public class FailoverTransport implements CompositeTransport { Thread.currentThread().interrupt(); throw new InterruptedIOException(); } + if (!disposed) { if (error != null) { if (error instanceof IOException) { @@ -738,7 +749,6 @@ public class FailoverTransport implements CompositeTransport { } private void doUpdateURIsFromDisk() { - // If updateURIsURL is specified, read the file and add any new // transport URI's to this FailOverTransport. // Note: Could track file timestamp to avoid unnecessary reading. @@ -814,35 +824,26 @@ public class FailoverTransport implements CompositeTransport { } doRebalance = false; } + if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { reconnectDelay = initialReconnectDelay; } + + Transport transport = null; + URI uri = null; + + // If we have a backup already waiting lets try it. synchronized (backupMutex) { if (backup && !backups.isEmpty()) { BackupTransport bt = backups.remove(0); - Transport t = bt.getTransport(); - URI uri = bt.getUri(); - t.setTransportListener(myTransportListener); - try { - if (started) { - restoreTransport(t); - } - reconnectDelay = initialReconnectDelay; - failedConnectTransportURI = null; - connectedTransportURI = uri; - connectedTransport.set(t); - reconnectMutex.notifyAll(); - connectFailures = 0; - LOG.info("Successfully reconnected to backup " + uri); - return false; - } catch (Exception e) { - LOG.debug("Backup transport failed", e); - } + transport = bt.getTransport(); + uri = bt.getUri(); } } - // Sleep for the reconnectDelay - if (!firstConnection && (reconnectDelay > 0) && !disposed) { + // Sleep for the reconnectDelay if there's no backup and we aren't trying + // for the first time, or we were disposed for some reason. + if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { synchronized (sleepMutex) { LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); try { @@ -854,63 +855,76 @@ public class FailoverTransport implements CompositeTransport { } Iterator iter = connectList.iterator(); - while (iter.hasNext() && connectedTransport.get() == null && !disposed) { + while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { - URI uri = iter.next(); - Transport t = null; try { + SslContext.setCurrentSslContext(brokerSslContext); + + // We could be starting with a backup and if so we wait to grab a + // URI from the pool until next time around. + if (transport == null) { + uri = iter.next(); + transport = TransportFactory.compositeConnect(uri); + } + if (LOG.isDebugEnabled()) { LOG.debug("Attempting connect to: " + uri); } - SslContext.setCurrentSslContext(brokerSslContext); - t = TransportFactory.compositeConnect(uri); - t.setTransportListener(myTransportListener); - t.start(); + transport.setTransportListener(myTransportListener); + transport.start(); if (started) { - restoreTransport(t); + restoreTransport(transport); } LOG.debug("Connection established"); reconnectDelay = initialReconnectDelay; connectedTransportURI = uri; - connectedTransport.set(t); + connectedTransport.set(transport); reconnectMutex.notifyAll(); connectFailures = 0; - // Make sure on initial startup, that the - // transportListener + + // Make sure on initial startup, that the transportListener // has been initialized for this instance. synchronized (listenerMutex) { if (transportListener == null) { try { - // if it isn't set after 2secs - it - // probably never will be + // if it isn't set after 2secs - it probably never will be listenerMutex.wait(2000); } catch (InterruptedException ex) { } } } + if (transportListener != null) { transportListener.transportResumed(); } else { LOG.debug("transport resumed by transport listener not set"); } + if (firstConnection) { firstConnection = false; LOG.info("Successfully connected to " + uri); } else { LOG.info("Successfully reconnected to " + uri); } + connected = true; return false; } catch (Exception e) { failure = e; - LOG.debug("Connect fail to: " + uri + ", reason: " + e); - if (t != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Connect fail to: " + uri + ", reason: " + e); + } + if (transport != null) { try { - t.stop(); + transport.stop(); + transport = null; } catch (Exception ee) { - LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee); + if (LOG.isDebugEnabled()) { + LOG.debug("Stop of failed transport: " + transport + + " failed with reason: " + ee); + } } } } finally { @@ -919,21 +933,24 @@ public class FailoverTransport implements CompositeTransport { } } } + int reconnectAttempts = 0; if (firstConnection) { if (this.startupMaxReconnectAttempts != 0) { reconnectAttempts = this.startupMaxReconnectAttempts; } } + if (reconnectAttempts == 0) { reconnectAttempts = this.maxReconnectAttempts; } + if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) { LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); connectionFailure = failure; - // Make sure on initial startup, that the transportListener has - // been initialized for this instance. + // Make sure on initial startup, that the transportListener has been + // initialized for this instance. synchronized (listenerMutex) { if (transportListener == null) { try { @@ -1122,7 +1139,6 @@ public class FailoverTransport implements CompositeTransport { } private boolean contains(URI newURI) { - boolean result = false; try { for (URI uri : uris) { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index 9905778c52..417516f03d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -32,7 +32,6 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.network.NetworkConnector; - public class FailoverClusterTest extends TestCase { private static final int NUMBER = 10; @@ -45,7 +44,6 @@ public class FailoverClusterTest extends TestCase { private final List connections = new ArrayList(); - public void testClusterConnectedAfterClients() throws Exception { createClients(); if (brokerB == null) { @@ -73,7 +71,6 @@ public class FailoverClusterTest extends TestCase { assertTrue(set.size() > 1); } - public void testClusterConnectedBeforeClients() throws Exception { if (brokerB == null) { @@ -151,6 +148,7 @@ public class FailoverClusterTest extends TestCase { answer.setUseShutdownHook(false); } + @SuppressWarnings("unused") protected void createClients() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); for (int i = 0; i < NUMBER; i++) { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index ff38fd9ebc..efd55a8321 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -100,6 +100,7 @@ public class FailoverConsumerOutstandingCommitTest { doTestFailoverConsumerDups(true); } + @SuppressWarnings("unchecked") public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { broker = createBroker(true); @@ -138,7 +139,6 @@ public class FailoverConsumerOutstandingCommitTest { final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - final CountDownLatch commitDoneLatch = new CountDownLatch(1); final CountDownLatch messagesReceived = new CountDownLatch(2); @@ -196,6 +196,7 @@ public class FailoverConsumerOutstandingCommitTest { doTestFailoverConsumerOutstandingSendTx(true); } + @SuppressWarnings("unchecked") public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception { final boolean watchTopicAdvisories = true; broker = createBroker(true); @@ -240,7 +241,6 @@ public class FailoverConsumerOutstandingCommitTest { final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal" + "?consumer.prefetchSize=" + prefetch); - final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); final CountDownLatch commitDoneLatch = new CountDownLatch(1); @@ -295,7 +295,6 @@ public class FailoverConsumerOutstandingCommitTest { assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS)); assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText()); - connection.close(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java index c743615761..0bd3ee51b9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java @@ -95,6 +95,7 @@ public class FailoverConsumerUnconsumedTest { doTestFailoverConsumerDups(false); } + @SuppressWarnings("unchecked") public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { final int maxConsumers = 4; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java index 970020e0d3..154c1968c7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java @@ -79,6 +79,7 @@ public class FailoverPrefetchZeroTest { return broker; } + @SuppressWarnings("unchecked") @Test public void testPrefetchZeroConsumerThroughRestart() throws Exception { broker = createBroker(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 854e5ecf38..bffbb8961a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -140,6 +140,7 @@ public class FailoverTransactionTest extends TestSupport { new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.AMQ, PersistenceAdapterChoice.JDBC}); } + @SuppressWarnings("unchecked") public void testFailoverCommitReplyLost() throws Exception { broker = createBroker(true); @@ -234,15 +235,15 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } - public void initCombosForTestFailoverSendReplyLost() { addCombinationValues("defaultPersistenceAdapter", - new Object[]{PersistenceAdapterChoice.KahaDB, - PersistenceAdapterChoice.JDBC - // not implemented for AMQ store - }); + new Object[]{PersistenceAdapterChoice.KahaDB, + PersistenceAdapterChoice.JDBC + // not implemented for AMQ store + }); } + @SuppressWarnings("unchecked") public void testFailoverSendReplyLost() throws Exception { broker = createBroker(true); @@ -341,15 +342,15 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } - public void initCombosForTestFailoverConnectionSendReplyLost() { addCombinationValues("defaultPersistenceAdapter", - new Object[]{PersistenceAdapterChoice.KahaDB, - PersistenceAdapterChoice.JDBC - // last producer message id store feature not implemented for AMQ store - }); + new Object[]{PersistenceAdapterChoice.KahaDB, + PersistenceAdapterChoice.JDBC + // last producer message id store feature not implemented for AMQ store + }); } + @SuppressWarnings("unchecked") public void testFailoverConnectionSendReplyLost() throws Exception { broker = createBroker(true); @@ -579,6 +580,7 @@ public class FailoverTransactionTest extends TestSupport { } } + @SuppressWarnings("unchecked") public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception { broker = createBroker(true); setDefaultPersistenceAdapter(broker); @@ -681,7 +683,6 @@ public class FailoverTransactionTest extends TestSupport { } }); - // will be stopped by the plugin broker.waitUntilStopped(); broker = createBroker(false, url); @@ -776,7 +777,6 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } - public void testWaitForMissingRedeliveries() throws Exception { LOG.info("testWaitForMissingRedeliveries()"); broker = createBroker(true); @@ -825,7 +825,6 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } - public void testPoisonOnDeliveryWhilePending() throws Exception { LOG.info("testPoisonOnDeliveryWhilePending()"); broker = createBroker(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java new file mode 100644 index 0000000000..bcfa48ea77 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverTransportBackupsTest { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class); + + protected Transport transport; + protected FailoverTransport failoverTransport; + private int commandsReceived; + private int exceptionReceived; + private int transportInterruptions; + private int transportResumptions; + + BrokerService broker1; + BrokerService broker2; + BrokerService broker3; + + @Before + public void setUp() throws Exception { + broker1 = createBroker("1"); + broker2 = createBroker("2"); + broker3 = createBroker("3"); + + broker1.start(); + broker2.start(); + broker3.start(); + + broker1.waitUntilStarted(); + broker2.waitUntilStarted(); + broker3.waitUntilStarted(); + + // Reset stats + commandsReceived = 0; + exceptionReceived = 0; + transportInterruptions = 0; + transportResumptions = 0; + } + + @After + public void tearDown() throws Exception { + if (transport != null) { + transport.stop(); + } + + broker1.stop(); + broker1.waitUntilStopped(); + broker2.stop(); + broker2.waitUntilStopped(); + broker3.stop(); + broker3.waitUntilStopped(); + } + + @Test + public void testBackupsAreCreated() throws Exception { + this.transport = createTransport(2); + assertNotNull(failoverTransport); + assertTrue(failoverTransport.isBackup()); + assertEquals(2, failoverTransport.getBackupPoolSize()); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 2; + } + })); + } + + @Test + public void testFailoverToBackups() throws Exception { + this.transport = createTransport(2); + assertNotNull(failoverTransport); + assertTrue(failoverTransport.isBackup()); + assertEquals(2, failoverTransport.getBackupPoolSize()); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 2; + } + })); + + broker1.stop(); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 1; + } + })); + + assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1); + assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1); + + broker2.stop(); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 0; + } + })); + + assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2); + assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2); + } + + @Test + public void testBackupsRefilled() throws Exception { + this.transport = createTransport(1); + assertNotNull(failoverTransport); + assertTrue(failoverTransport.isBackup()); + assertEquals(1, failoverTransport.getBackupPoolSize()); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 1; + } + })); + + broker1.stop(); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 1; + } + })); + + broker2.stop(); + + assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups()); + return failoverTransport.getCurrentBackups() == 0; + } + })); + } + + private BrokerService createBroker(String name) throws Exception { + BrokerService bs = new BrokerService(); + bs.setBrokerName(name); + bs.setUseJmx(false); + bs.setPersistent(false); + bs.addConnector("tcp://localhost:0"); + return bs; + } + + protected Transport createTransport(int backups) throws Exception { + String connectionUri = "failover://("+ + broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," + + broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," + + broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")"; + + if (backups > 0) { + connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups; + } + + Transport transport = TransportFactory.connect(new URI(connectionUri)); + transport.setTransportListener(new TransportListener() { + + public void onCommand(Object command) { + LOG.debug("Test Transport Listener received Command: " + command); + commandsReceived++; + } + + public void onException(IOException error) { + LOG.debug("Test Transport Listener received Exception: " + error); + exceptionReceived++; + } + + public void transportInterupted() { + transportInterruptions++; + LOG.debug("Test Transport Listener records transport Interrupted: " + transportInterruptions); + } + + public void transportResumed() { + transportResumptions++; + LOG.debug("Test Transport Listener records transport Resumed: " + transportResumptions); + } + }); + transport.start(); + + this.failoverTransport = transport.narrow(FailoverTransport.class); + + return transport; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java index 2e8831feac..03b49768fc 100755 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java @@ -37,7 +37,6 @@ import org.apache.activemq.network.NetworkTestSupport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.transport.multicast.MulticastTransportTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +142,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport { //To change body of implemented methods use File | Settings | File Templates. } }; + @SuppressWarnings("unused") StubConnection c = createFailoverConnection(listener); int count = 0; while(count++ < 20 && info[0] == null) { @@ -160,6 +160,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport { return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; } + @SuppressWarnings("unchecked") protected StubConnection createFailoverConnection(TransportListener listener) throws Exception { URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + ""); Transport transport = TransportFactory.connect(failoverURI);