mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport
(cherry picked from commit db1506a592
)
This commit is contained in:
parent
aa8b64420b
commit
741ee01e11
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.activemq.transport.failover;
|
||||
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.transport.DefaultTransportListener;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
|
||||
|
@ -29,10 +30,12 @@ class BackupTransport extends DefaultTransportListener{
|
|||
private Transport transport;
|
||||
private URI uri;
|
||||
private boolean disposed;
|
||||
|
||||
private BrokerInfo brokerInfo;
|
||||
|
||||
BackupTransport(FailoverTransport ft){
|
||||
this.failoverTransport=ft;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(IOException error) {
|
||||
this.disposed=true;
|
||||
|
@ -41,6 +44,17 @@ class BackupTransport extends DefaultTransportListener{
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommand(Object command) {
|
||||
if (command instanceof BrokerInfo) {
|
||||
brokerInfo = (BrokerInfo) command;
|
||||
}
|
||||
}
|
||||
|
||||
public BrokerInfo getBrokerInfo() {
|
||||
return brokerInfo;
|
||||
}
|
||||
|
||||
public Transport getTransport() {
|
||||
return transport;
|
||||
}
|
||||
|
|
|
@ -991,6 +991,7 @@ public class FailoverTransport implements CompositeTransport {
|
|||
backups.remove(bt);
|
||||
transport = bt.getTransport();
|
||||
uri = bt.getUri();
|
||||
myTransportListener.onCommand(bt.getBrokerInfo());
|
||||
if (priorityBackup && priorityBackupAvailable) {
|
||||
Transport old = this.connectedTransport.getAndSet(null);
|
||||
if (old != null) {
|
||||
|
|
|
@ -110,6 +110,12 @@ public class FailoverClusterTestSupport extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void assertBrokerInfo(String brokerName) throws Exception {
|
||||
for (ActiveMQConnection c : connections) {
|
||||
assertEquals(brokerName, c.getBrokerInfo().getBrokerName());
|
||||
}
|
||||
}
|
||||
|
||||
protected void addBroker(String name, BrokerService brokerService) {
|
||||
brokers.put(name, brokerService);
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||
createClients(5);
|
||||
|
||||
assertAllConnectedTo(urls.get(BROKER_A_NAME));
|
||||
|
||||
assertBrokerInfo(BROKER_A_NAME);
|
||||
|
||||
restart(false, BROKER_A_NAME, BROKER_B_NAME);
|
||||
|
||||
|
@ -169,8 +169,10 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||
|
||||
if (primary) {
|
||||
assertAllConnectedTo(urls.get(secondaryName));
|
||||
assertBrokerInfo(secondaryName);
|
||||
} else {
|
||||
assertAllConnectedTo(urls.get(primaryName));
|
||||
assertBrokerInfo(primaryName);
|
||||
}
|
||||
|
||||
if (primary) {
|
||||
|
@ -186,6 +188,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
|
|||
Thread.sleep(5000);
|
||||
|
||||
assertAllConnectedTo(urls.get(primaryName));
|
||||
assertBrokerInfo(primaryName);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
|
@ -111,9 +113,10 @@ public class FailoverTransportBackupsTest {
|
|||
}
|
||||
}));
|
||||
|
||||
assertEquals("conected to..", "1", currentBrokerInfo.getBrokerName());
|
||||
broker1.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
|
@ -124,9 +127,10 @@ public class FailoverTransportBackupsTest {
|
|||
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
|
||||
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
|
||||
|
||||
assertEquals("conected to..", "2", currentBrokerInfo.getBrokerName());
|
||||
broker2.stop();
|
||||
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
|
||||
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
|
||||
|
@ -136,6 +140,8 @@ public class FailoverTransportBackupsTest {
|
|||
|
||||
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
|
||||
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
|
||||
|
||||
assertEquals("conected to..", "3", currentBrokerInfo.getBrokerName());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -183,6 +189,7 @@ public class FailoverTransportBackupsTest {
|
|||
return bs;
|
||||
}
|
||||
|
||||
BrokerInfo currentBrokerInfo;
|
||||
protected Transport createTransport(int backups) throws Exception {
|
||||
String connectionUri = "failover://("+
|
||||
broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
|
||||
|
@ -199,6 +206,10 @@ public class FailoverTransportBackupsTest {
|
|||
@Override
|
||||
public void onCommand(Object command) {
|
||||
LOG.debug("Test Transport Listener received Command: " + command);
|
||||
if (command instanceof BrokerInfo) {
|
||||
currentBrokerInfo = (BrokerInfo) command;
|
||||
LOG.info("BrokerInfo: " + currentBrokerInfo);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue