mirror of https://github.com/apache/activemq.git
Adds tests for the backup option and cleans up some other the other tests. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1878e951c6
commit
7d8ce02dee
|
@ -431,6 +431,10 @@ public class FailoverTransport implements CompositeTransport {
|
|||
this.backupPoolSize = backupPoolSize;
|
||||
}
|
||||
|
||||
public int getCurrentBackups() {
|
||||
return this.backups.size();
|
||||
}
|
||||
|
||||
public boolean isTrackMessages() {
|
||||
return trackMessages;
|
||||
}
|
||||
|
@ -470,11 +474,11 @@ public class FailoverTransport implements CompositeTransport {
|
|||
} else if (command instanceof RemoveInfo || command.isMessageAck()) {
|
||||
// Simulate response to RemoveInfo command or MessageAck (as it will be stale)
|
||||
stateTracker.track(command);
|
||||
if (command.isResponseRequired()) {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(command.getCommandId());
|
||||
myTransportListener.onCommand(response);
|
||||
}
|
||||
if (command.isResponseRequired()) {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(command.getCommandId());
|
||||
myTransportListener.onCommand(response);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -489,18 +493,24 @@ public class FailoverTransport implements CompositeTransport {
|
|||
boolean timedout = false;
|
||||
while (transport == null && !disposed && connectionFailure == null
|
||||
&& !Thread.currentThread().isInterrupted()) {
|
||||
LOG.trace("Waiting for transport to reconnect..: " + command);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Waiting for transport to reconnect..: " + command);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
if (timeout > 0 && (end - start > timeout)) {
|
||||
timedout = true;
|
||||
LOG.info("Failover timed out after " + (end - start) + "ms");
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Failover timed out after " + (end - start) + "ms");
|
||||
}
|
||||
break;
|
||||
}
|
||||
try {
|
||||
reconnectMutex.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.debug("Interupted: " + e, e);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Interupted: " + e, e);
|
||||
}
|
||||
}
|
||||
transport = connectedTransport.get();
|
||||
}
|
||||
|
@ -572,6 +582,7 @@ public class FailoverTransport implements CompositeTransport {
|
|||
Thread.currentThread().interrupt();
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
|
||||
if (!disposed) {
|
||||
if (error != null) {
|
||||
if (error instanceof IOException) {
|
||||
|
@ -738,7 +749,6 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
|
||||
private void doUpdateURIsFromDisk() {
|
||||
|
||||
// If updateURIsURL is specified, read the file and add any new
|
||||
// transport URI's to this FailOverTransport.
|
||||
// Note: Could track file timestamp to avoid unnecessary reading.
|
||||
|
@ -814,35 +824,26 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
doRebalance = false;
|
||||
}
|
||||
|
||||
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
|
||||
reconnectDelay = initialReconnectDelay;
|
||||
}
|
||||
|
||||
Transport transport = null;
|
||||
URI uri = null;
|
||||
|
||||
// If we have a backup already waiting lets try it.
|
||||
synchronized (backupMutex) {
|
||||
if (backup && !backups.isEmpty()) {
|
||||
BackupTransport bt = backups.remove(0);
|
||||
Transport t = bt.getTransport();
|
||||
URI uri = bt.getUri();
|
||||
t.setTransportListener(myTransportListener);
|
||||
try {
|
||||
if (started) {
|
||||
restoreTransport(t);
|
||||
}
|
||||
reconnectDelay = initialReconnectDelay;
|
||||
failedConnectTransportURI = null;
|
||||
connectedTransportURI = uri;
|
||||
connectedTransport.set(t);
|
||||
reconnectMutex.notifyAll();
|
||||
connectFailures = 0;
|
||||
LOG.info("Successfully reconnected to backup " + uri);
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Backup transport failed", e);
|
||||
}
|
||||
transport = bt.getTransport();
|
||||
uri = bt.getUri();
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep for the reconnectDelay
|
||||
if (!firstConnection && (reconnectDelay > 0) && !disposed) {
|
||||
// Sleep for the reconnectDelay if there's no backup and we aren't trying
|
||||
// for the first time, or we were disposed for some reason.
|
||||
if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
|
||||
synchronized (sleepMutex) {
|
||||
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
|
||||
try {
|
||||
|
@ -854,63 +855,76 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
|
||||
Iterator<URI> iter = connectList.iterator();
|
||||
while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
|
||||
while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
|
||||
|
||||
URI uri = iter.next();
|
||||
Transport t = null;
|
||||
try {
|
||||
SslContext.setCurrentSslContext(brokerSslContext);
|
||||
|
||||
// We could be starting with a backup and if so we wait to grab a
|
||||
// URI from the pool until next time around.
|
||||
if (transport == null) {
|
||||
uri = iter.next();
|
||||
transport = TransportFactory.compositeConnect(uri);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Attempting connect to: " + uri);
|
||||
}
|
||||
SslContext.setCurrentSslContext(brokerSslContext);
|
||||
t = TransportFactory.compositeConnect(uri);
|
||||
t.setTransportListener(myTransportListener);
|
||||
t.start();
|
||||
transport.setTransportListener(myTransportListener);
|
||||
transport.start();
|
||||
|
||||
if (started) {
|
||||
restoreTransport(t);
|
||||
restoreTransport(transport);
|
||||
}
|
||||
|
||||
LOG.debug("Connection established");
|
||||
reconnectDelay = initialReconnectDelay;
|
||||
connectedTransportURI = uri;
|
||||
connectedTransport.set(t);
|
||||
connectedTransport.set(transport);
|
||||
reconnectMutex.notifyAll();
|
||||
connectFailures = 0;
|
||||
// Make sure on initial startup, that the
|
||||
// transportListener
|
||||
|
||||
// Make sure on initial startup, that the transportListener
|
||||
// has been initialized for this instance.
|
||||
synchronized (listenerMutex) {
|
||||
if (transportListener == null) {
|
||||
try {
|
||||
// if it isn't set after 2secs - it
|
||||
// probably never will be
|
||||
// if it isn't set after 2secs - it probably never will be
|
||||
listenerMutex.wait(2000);
|
||||
} catch (InterruptedException ex) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (transportListener != null) {
|
||||
transportListener.transportResumed();
|
||||
} else {
|
||||
LOG.debug("transport resumed by transport listener not set");
|
||||
}
|
||||
|
||||
if (firstConnection) {
|
||||
firstConnection = false;
|
||||
LOG.info("Successfully connected to " + uri);
|
||||
} else {
|
||||
LOG.info("Successfully reconnected to " + uri);
|
||||
}
|
||||
|
||||
connected = true;
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
failure = e;
|
||||
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
|
||||
if (t != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
|
||||
}
|
||||
if (transport != null) {
|
||||
try {
|
||||
t.stop();
|
||||
transport.stop();
|
||||
transport = null;
|
||||
} catch (Exception ee) {
|
||||
LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stop of failed transport: " + transport +
|
||||
" failed with reason: " + ee);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -919,21 +933,24 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
int reconnectAttempts = 0;
|
||||
if (firstConnection) {
|
||||
if (this.startupMaxReconnectAttempts != 0) {
|
||||
reconnectAttempts = this.startupMaxReconnectAttempts;
|
||||
}
|
||||
}
|
||||
|
||||
if (reconnectAttempts == 0) {
|
||||
reconnectAttempts = this.maxReconnectAttempts;
|
||||
}
|
||||
|
||||
if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
|
||||
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
|
||||
connectionFailure = failure;
|
||||
|
||||
// Make sure on initial startup, that the transportListener has
|
||||
// been initialized for this instance.
|
||||
// Make sure on initial startup, that the transportListener has been
|
||||
// initialized for this instance.
|
||||
synchronized (listenerMutex) {
|
||||
if (transportListener == null) {
|
||||
try {
|
||||
|
@ -1122,7 +1139,6 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
|
||||
private boolean contains(URI newURI) {
|
||||
|
||||
boolean result = false;
|
||||
try {
|
||||
for (URI uri : uris) {
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
|
||||
|
||||
public class FailoverClusterTest extends TestCase {
|
||||
|
||||
private static final int NUMBER = 10;
|
||||
|
@ -45,7 +44,6 @@ public class FailoverClusterTest extends TestCase {
|
|||
|
||||
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
|
||||
|
||||
|
||||
public void testClusterConnectedAfterClients() throws Exception {
|
||||
createClients();
|
||||
if (brokerB == null) {
|
||||
|
@ -73,7 +71,6 @@ public class FailoverClusterTest extends TestCase {
|
|||
assertTrue(set.size() > 1);
|
||||
}
|
||||
|
||||
|
||||
public void testClusterConnectedBeforeClients() throws Exception {
|
||||
|
||||
if (brokerB == null) {
|
||||
|
@ -151,6 +148,7 @@ public class FailoverClusterTest extends TestCase {
|
|||
answer.setUseShutdownHook(false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
protected void createClients() throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
|
||||
for (int i = 0; i < NUMBER; i++) {
|
||||
|
|
|
@ -100,6 +100,7 @@ public class FailoverConsumerOutstandingCommitTest {
|
|||
doTestFailoverConsumerDups(true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
|
@ -138,7 +139,6 @@ public class FailoverConsumerOutstandingCommitTest {
|
|||
|
||||
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
|
||||
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
||||
final CountDownLatch messagesReceived = new CountDownLatch(2);
|
||||
|
||||
|
@ -196,6 +196,7 @@ public class FailoverConsumerOutstandingCommitTest {
|
|||
doTestFailoverConsumerOutstandingSendTx(true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
|
||||
final boolean watchTopicAdvisories = true;
|
||||
broker = createBroker(true);
|
||||
|
@ -240,7 +241,6 @@ public class FailoverConsumerOutstandingCommitTest {
|
|||
final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal"
|
||||
+ "?consumer.prefetchSize=" + prefetch);
|
||||
|
||||
|
||||
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
|
||||
|
@ -295,7 +295,6 @@ public class FailoverConsumerOutstandingCommitTest {
|
|||
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
|
||||
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
|
||||
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -95,6 +95,7 @@ public class FailoverConsumerUnconsumedTest {
|
|||
doTestFailoverConsumerDups(false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
|
||||
|
||||
final int maxConsumers = 4;
|
||||
|
|
|
@ -79,6 +79,7 @@ public class FailoverPrefetchZeroTest {
|
|||
return broker;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testPrefetchZeroConsumerThroughRestart() throws Exception {
|
||||
broker = createBroker(true);
|
||||
|
|
|
@ -140,6 +140,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.AMQ, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testFailoverCommitReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
|
@ -234,15 +235,15 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestFailoverSendReplyLost() {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// not implemented for AMQ store
|
||||
});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// not implemented for AMQ store
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testFailoverSendReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
|
@ -341,15 +342,15 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
public void initCombosForTestFailoverConnectionSendReplyLost() {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// last producer message id store feature not implemented for AMQ store
|
||||
});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB,
|
||||
PersistenceAdapterChoice.JDBC
|
||||
// last producer message id store feature not implemented for AMQ store
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testFailoverConnectionSendReplyLost() throws Exception {
|
||||
|
||||
broker = createBroker(true);
|
||||
|
@ -579,6 +580,7 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
|
||||
broker = createBroker(true);
|
||||
setDefaultPersistenceAdapter(broker);
|
||||
|
@ -681,7 +683,6 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
// will be stopped by the plugin
|
||||
broker.waitUntilStopped();
|
||||
broker = createBroker(false, url);
|
||||
|
@ -776,7 +777,6 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
public void testWaitForMissingRedeliveries() throws Exception {
|
||||
LOG.info("testWaitForMissingRedeliveries()");
|
||||
broker = createBroker(true);
|
||||
|
@ -825,7 +825,6 @@ public class FailoverTransactionTest extends TestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
public void testPoisonOnDeliveryWhilePending() throws Exception {
|
||||
LOG.info("testPoisonOnDeliveryWhilePending()");
|
||||
broker = createBroker(true);
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.failover;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class FailoverTransportBackupsTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
|
||||
|
||||
protected Transport transport;
|
||||
protected FailoverTransport failoverTransport;
|
||||
private int commandsReceived;
|
||||
private int exceptionReceived;
|
||||
private int transportInterruptions;
|
||||
private int transportResumptions;
|
||||
|
||||
BrokerService broker1;
|
||||
BrokerService broker2;
|
||||
BrokerService broker3;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
broker1 = createBroker("1");
|
||||
broker2 = createBroker("2");
|
||||
broker3 = createBroker("3");
|
||||
|
||||
broker1.start();
|
||||
broker2.start();
|
||||
broker3.start();
|
||||
|
||||
broker1.waitUntilStarted();
|
||||
broker2.waitUntilStarted();
|
||||
broker3.waitUntilStarted();
|
||||
|
||||
// Reset stats
|
||||
commandsReceived = 0;
|
||||
exceptionReceived = 0;
|
||||
transportInterruptions = 0;
|
||||
transportResumptions = 0;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (transport != null) {
|
||||
transport.stop();
|
||||
}
|
||||
|
||||
broker1.stop();
|
||||
broker1.waitUntilStopped();
|
||||
broker2.stop();
|
||||
broker2.waitUntilStopped();
|
||||
broker3.stop();
|
||||
broker3.waitUntilStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackupsAreCreated() throws Exception {
|
||||
this.transport = createTransport(2);
|
||||
assertNotNull(failoverTransport);
|
||||
assertTrue(failoverTransport.isBackup());
|
||||
assertEquals(2, failoverTransport.getBackupPoolSize());
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 2;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverToBackups() throws Exception {
|
||||
this.transport = createTransport(2);
|
||||
assertNotNull(failoverTransport);
|
||||
assertTrue(failoverTransport.isBackup());
|
||||
assertEquals(2, failoverTransport.getBackupPoolSize());
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 2;
|
||||
}
|
||||
}));
|
||||
|
||||
broker1.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
|
||||
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
|
||||
|
||||
broker2.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
|
||||
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackupsRefilled() throws Exception {
|
||||
this.transport = createTransport(1);
|
||||
assertNotNull(failoverTransport);
|
||||
assertTrue(failoverTransport.isBackup());
|
||||
assertEquals(1, failoverTransport.getBackupPoolSize());
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
broker1.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
broker2.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
return failoverTransport.getCurrentBackups() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private BrokerService createBroker(String name) throws Exception {
|
||||
BrokerService bs = new BrokerService();
|
||||
bs.setBrokerName(name);
|
||||
bs.setUseJmx(false);
|
||||
bs.setPersistent(false);
|
||||
bs.addConnector("tcp://localhost:0");
|
||||
return bs;
|
||||
}
|
||||
|
||||
protected Transport createTransport(int backups) throws Exception {
|
||||
String connectionUri = "failover://("+
|
||||
broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
|
||||
broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
|
||||
broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
|
||||
|
||||
if (backups > 0) {
|
||||
connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;
|
||||
}
|
||||
|
||||
Transport transport = TransportFactory.connect(new URI(connectionUri));
|
||||
transport.setTransportListener(new TransportListener() {
|
||||
|
||||
public void onCommand(Object command) {
|
||||
LOG.debug("Test Transport Listener received Command: " + command);
|
||||
commandsReceived++;
|
||||
}
|
||||
|
||||
public void onException(IOException error) {
|
||||
LOG.debug("Test Transport Listener received Exception: " + error);
|
||||
exceptionReceived++;
|
||||
}
|
||||
|
||||
public void transportInterupted() {
|
||||
transportInterruptions++;
|
||||
LOG.debug("Test Transport Listener records transport Interrupted: " + transportInterruptions);
|
||||
}
|
||||
|
||||
public void transportResumed() {
|
||||
transportResumptions++;
|
||||
LOG.debug("Test Transport Listener records transport Resumed: " + transportResumptions);
|
||||
}
|
||||
});
|
||||
transport.start();
|
||||
|
||||
this.failoverTransport = transport.narrow(FailoverTransport.class);
|
||||
|
||||
return transport;
|
||||
}
|
||||
}
|
|
@ -37,7 +37,6 @@ import org.apache.activemq.network.NetworkTestSupport;
|
|||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.transport.multicast.MulticastTransportTest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -143,6 +142,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
|
|||
//To change body of implemented methods use File | Settings | File Templates.
|
||||
}
|
||||
};
|
||||
@SuppressWarnings("unused")
|
||||
StubConnection c = createFailoverConnection(listener);
|
||||
int count = 0;
|
||||
while(count++ < 20 && info[0] == null) {
|
||||
|
@ -160,6 +160,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
|
|||
return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
|
||||
URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
|
||||
Transport transport = TransportFactory.connect(failoverURI);
|
||||
|
|
Loading…
Reference in New Issue