https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport

This commit is contained in:
gtully 2016-01-12 14:00:13 +00:00
parent 66cfc7bab3
commit db1506a592
5 changed files with 39 additions and 4 deletions

View File

@ -18,6 +18,7 @@
package org.apache.activemq.transport.failover;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
@ -29,10 +30,12 @@ class BackupTransport extends DefaultTransportListener{
private Transport transport;
private URI uri;
private boolean disposed;
private BrokerInfo brokerInfo;
BackupTransport(FailoverTransport ft){
this.failoverTransport=ft;
}
@Override
public void onException(IOException error) {
this.disposed=true;
@ -41,6 +44,17 @@ class BackupTransport extends DefaultTransportListener{
}
}
@Override
public void onCommand(Object command) {
if (command instanceof BrokerInfo) {
brokerInfo = (BrokerInfo) command;
}
}
public BrokerInfo getBrokerInfo() {
return brokerInfo;
}
public Transport getTransport() {
return transport;
}

View File

@ -991,6 +991,7 @@ public class FailoverTransport implements CompositeTransport {
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
myTransportListener.onCommand(bt.getBrokerInfo());
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {

View File

@ -110,6 +110,12 @@ public class FailoverClusterTestSupport extends TestCase {
}
}
protected void assertBrokerInfo(String brokerName) throws Exception {
for (ActiveMQConnection c : connections) {
assertEquals(brokerName, c.getBrokerInfo().getBrokerName());
}
}
protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService);
}

View File

@ -52,7 +52,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
createClients(5);
assertAllConnectedTo(urls.get(BROKER_A_NAME));
assertBrokerInfo(BROKER_A_NAME);
restart(false, BROKER_A_NAME, BROKER_B_NAME);
@ -169,8 +169,10 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
if (primary) {
assertAllConnectedTo(urls.get(secondaryName));
assertBrokerInfo(secondaryName);
} else {
assertAllConnectedTo(urls.get(primaryName));
assertBrokerInfo(primaryName);
}
if (primary) {
@ -186,6 +188,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
Thread.sleep(5000);
assertAllConnectedTo(urls.get(primaryName));
assertBrokerInfo(primaryName);
}

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.net.URI;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@ -111,9 +113,10 @@ public class FailoverTransportBackupsTest {
}
}));
assertEquals("conected to..", "1", currentBrokerInfo.getBrokerName());
broker1.stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@ -124,9 +127,10 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
assertEquals("conected to..", "2", currentBrokerInfo.getBrokerName());
broker2.stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@ -136,6 +140,8 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
assertEquals("conected to..", "3", currentBrokerInfo.getBrokerName());
}
@Test
@ -183,6 +189,7 @@ public class FailoverTransportBackupsTest {
return bs;
}
BrokerInfo currentBrokerInfo;
protected Transport createTransport(int backups) throws Exception {
String connectionUri = "failover://("+
broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
@ -199,6 +206,10 @@ public class FailoverTransportBackupsTest {
@Override
public void onCommand(Object command) {
LOG.debug("Test Transport Listener received Command: " + command);
if (command instanceof BrokerInfo) {
currentBrokerInfo = (BrokerInfo) command;
LOG.info("BrokerInfo: " + currentBrokerInfo);
}
}
@Override