https://issues.apache.org/jira/browse/AMQ-3124 - Failover transport client gets corrupted connectedBrokers data - apply variant of suggested patch with thanks. Stripping server side url options is the right way to go, reusing api used by discovery publisher to same effect, additional test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1057565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-01-11 10:28:43 +00:00
parent 55c3ef9fda
commit 0ed0ba584c
6 changed files with 160 additions and 89 deletions

View File

@ -29,7 +29,6 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.security.SecurityContext; import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
@ -484,8 +483,8 @@ public class AdvisoryBroker extends BrokerFilter {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = getBrokerService().getVmConnectorURI().toString(); String url = getBrokerService().getVmConnectorURI().toString();
if (getBrokerService().getDefaultSocketURI() != null) { if (getBrokerService().getDefaultSocketURIString() != null) {
url = getBrokerService().getDefaultSocketURI().toString(); url = getBrokerService().getDefaultSocketURIString();
} }
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);

View File

@ -152,7 +152,7 @@ public class BrokerService implements Service {
private boolean deleteAllMessagesOnStartup; private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true; private boolean advisorySupport = true;
private URI vmConnectorURI; private URI vmConnectorURI;
private URI defaultSocketURI; private String defaultSocketURIString;
private PolicyMap destinationPolicy; private PolicyMap destinationPolicy;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false);
@ -1315,24 +1315,24 @@ public class BrokerService implements Service {
this.vmConnectorURI = vmConnectorURI; this.vmConnectorURI = vmConnectorURI;
} }
public URI getDefaultSocketURI() { public String getDefaultSocketURIString() {
if (started.get()) { if (started.get()) {
if (this.defaultSocketURI==null) { if (this.defaultSocketURIString ==null) {
for (TransportConnector tc:this.transportConnectors) { for (TransportConnector tc:this.transportConnectors) {
URI result = null; String result = null;
try { try {
result = tc.getConnectUri(); result = tc.getPublishableConnectString();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to get the ConnectURI for "+tc,e); LOG.warn("Failed to get the ConnectURI for "+tc,e);
} }
if (result != null) { if (result != null) {
this.defaultSocketURI=result; this.defaultSocketURIString =result;
break; break;
} }
} }
} }
return this.defaultSocketURI; return this.defaultSocketURIString;
} }
return null; return null;
} }
@ -2076,8 +2076,8 @@ public class BrokerService implements Service {
connector.setLocalUri(uri); connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName()); connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations); connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURI() != null) { if (getDefaultSocketURIString() != null) {
connector.setBrokerURL(getDefaultSocketURI().toString()); connector.setBrokerURL(getDefaultSocketURIString());
} }
connector.start(); connector.start();
} }

View File

@ -584,7 +584,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ConnectionId connectionId = info.getSessionId().getParentId(); ConnectionId connectionId = info.getSessionId().getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId); TransportConnectionState cs = lookupConnectionState(connectionId);
// Avoid replaying dup commands // Avoid replaying dup commands
if (!cs.getSessionIds().contains(info.getSessionId())) { if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
broker.addSession(cs.getContext(), info); broker.addSession(cs.getContext(), info);
try { try {
cs.addSession(info); cs.addSession(info);

View File

@ -254,7 +254,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
LOG.info("Connector " + getName() + " Started"); LOG.info("Connector " + getName() + " Started");
} }
private String getPublishableConnectString() throws Exception { public String getPublishableConnectString() throws Exception {
URI theConnectURI = getConnectUri(); URI theConnectURI = getConnectUri();
String publishableConnectString = theConnectURI.toString(); String publishableConnectString = theConnectURI.toString();
// strip off server side query parameters which may not be compatible to // strip off server side query parameters which may not be compatible to
@ -386,8 +386,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
boolean rebalance = isRebalanceClusterClients(); boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = ""; String connectedBrokers = "";
String self = ""; String self = "";
if (brokerService.getDefaultSocketURI() != null) {
self += brokerService.getDefaultSocketURI().toString(); if (isUpdateClusterClients()) {
if (brokerService.getDefaultSocketURIString() != null) {
self += brokerService.getDefaultSocketURIString();
self += ","; self += ",";
} }
if (rebalance == false) { if (rebalance == false) {
@ -404,6 +406,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
if (rebalance) { if (rebalance) {
connectedBrokers += self; connectedBrokers += self;
} }
}
ConnectionControl control = new ConnectionControl(); ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers); control.setConnectedBrokers(connectedBrokers);

View File

@ -59,6 +59,21 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
assertTrue(set.size() > 1); assertTrue(set.size() > 1);
} }
public void testClusterURIOptionsStrip() throws Exception{
createClients();
if (brokerB == null) {
// add in server side only url param, should not be propagated
brokerB = createBrokerB(BROKER_B_BIND_ADDRESS + "?transport.closeAsync=false");
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c:connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
}
public void testClusterConnectedBeforeClients() throws Exception{ public void testClusterConnectedBeforeClients() throws Exception{
if (brokerB == null) { if (brokerB == null) {
@ -80,7 +95,7 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
if (brokerA == null) { if (brokerA == null) {
brokerA = createBrokerA(BROKER_A_BIND_ADDRESS); brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
} }
@ -103,6 +118,7 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
protected BrokerService createBrokerA(String uri) throws Exception { protected BrokerService createBrokerA(String uri) throws Exception {
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
answer.setUseJmx(false);
configureConsumerBroker(answer,uri); configureConsumerBroker(answer,uri);
answer.start(); answer.start();
return answer; return answer;
@ -119,6 +135,7 @@ private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnec
protected BrokerService createBrokerB(String uri) throws Exception { protected BrokerService createBrokerB(String uri) throws Exception {
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
answer.setUseJmx(false);
configureNetwork(answer,uri); configureNetwork(answer,uri);
answer.start(); answer.start();
return answer; return answer;

View File

@ -18,50 +18,58 @@ package org.apache.activemq.transport.failover;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.net.URI; import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.log4j.Logger;
public class FailoverUpdateURIsTest extends TestCase { public class FailoverUpdateURIsTest extends TestCase {
private static final String QUEUE_NAME = "test.failoverupdateuris"; private static final String QUEUE_NAME = "test.failoverupdateuris";
private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
public void testUpdateURIs() throws Exception { String firstTcpUri = "tcp://localhost:61616";
String secondTcpUri = "tcp://localhost:61626";
Connection connection = null;
BrokerService bs2 = null;
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
if (bs2 != null) {
bs2.stop();
}
}
public void testUpdateURIsViaFile() throws Exception {
long timeout = 1000;
URI firstTcpUri = new URI("tcp://localhost:61616");
URI secondTcpUri = new URI("tcp://localhost:61626");
String targetDir = "target/" + getName(); String targetDir = "target/" + getName();
new File(targetDir).mkdir(); new File(targetDir).mkdir();
File updateFile = new File(targetDir + "/updateURIsFile.txt"); File updateFile = new File(targetDir + "/updateURIsFile.txt");
System.out.println(updateFile); LOG.info(updateFile);
System.out.println(updateFile.toURI()); LOG.info(updateFile.toURI());
System.out.println(updateFile.getAbsoluteFile()); LOG.info(updateFile.getAbsoluteFile());
System.out.println(updateFile.getAbsoluteFile().toURI()); LOG.info(updateFile.getAbsoluteFile().toURI());
FileOutputStream out = new FileOutputStream(updateFile); FileOutputStream out = new FileOutputStream(updateFile);
out.write(firstTcpUri.toString().getBytes()); out.write(firstTcpUri.getBytes());
out.close(); out.close();
BrokerService bs1 = new BrokerService(); BrokerService bs1 = createBroker("bs1", firstTcpUri);
bs1.setUseJmx(false);
bs1.addConnector(firstTcpUri);
bs1.start(); bs1.start();
// no failover uri's to start with, must be read from file... // no failover uri's to start with, must be read from file...
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile()); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
Connection connection = cf.createConnection(); connection = cf.createConnection();
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue theQueue = session.createQueue(QUEUE_NAME); Queue theQueue = session.createQueue(QUEUE_NAME);
@ -75,9 +83,7 @@ public class FailoverUpdateURIsTest extends TestCase {
bs1.stop(); bs1.stop();
bs1.waitUntilStopped(); bs1.waitUntilStopped();
BrokerService bs2 = new BrokerService(); bs2 = createBroker("bs2", secondTcpUri);
bs2.setUseJmx(false);
bs2.addConnector(secondTcpUri);
bs2.start(); bs2.start();
// add the transport uri for broker number 2 // add the transport uri for broker number 2
@ -91,4 +97,50 @@ public class FailoverUpdateURIsTest extends TestCase {
assertNotNull(msg); assertNotNull(msg);
} }
private BrokerService createBroker(String name, String tcpUri) throws Exception {
BrokerService bs = new BrokerService();
bs.setBrokerName(name);
bs.setUseJmx(false);
bs.setPersistent(false);
bs.addConnector(tcpUri);
return bs;
}
public void testAutoUpdateURIs() throws Exception {
BrokerService bs1 = new BrokerService();
bs1.setUseJmx(false);
TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
transportConnector.setUpdateClusterClients(true);
bs1.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue theQueue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(theQueue);
MessageConsumer consumer = session.createConsumer(theQueue);
Message message = session.createTextMessage("Test message");
producer.send(message);
Message msg = consumer.receive(4000);
assertNotNull(msg);
bs2 = createBroker("bs2", secondTcpUri);
NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
networkConnector.setDuplex(true);
bs2.start();
LOG.info("started brokerService 2");
bs2.waitUntilStarted();
TimeUnit.SECONDS.sleep(4);
LOG.info("stopping brokerService 1");
bs1.stop();
bs1.waitUntilStopped();
producer.send(message);
msg = consumer.receive(4000);
assertNotNull(msg);
}
} }